1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Copyright 2021 Fastly, Inc.
use anyhow::bail;
use anyhow::Result;
use async_channel::Sender;
use futures::SinkExt;
use log::*;
use prost::Message;
use std::os::unix::io::AsRawFd;
use tokio::net::UnixStream;
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
use dnstap_utils::dnstap;
use dnstap_utils::framestreams_codec::{self, Frame, FrameStreamsCodec};
/// Per-connection FrameStreams protocol handler. Reads delimited frames from the Unix socket
/// stream, decodes the protobuf payload, and then sends the protobuf object over a channel to a
/// [`crate::DnstapHandler`] for further processing.
pub struct FrameHandler {
/// The send side of the async channel, used by [`FrameHandler`]'s to send decoded dnstap
/// protobuf messages to the [`crate::DnstapHandler`]'s.
channel_sender: Sender<dnstap::Dnstap>,
/// The Unix stream to read frames from.
stream: UnixStream,
/// Identifying description of the connected `stream`.
stream_descr: String,
/// Counter of the number of bytes processed by this [`FrameHandler`].
count_data_bytes: usize,
/// Counter of the number of frames processed by this [`FrameHandler`].
count_data_frames: usize,
}
impl FrameHandler {
/// Create a new [`FrameHandler`] that reads from `stream` and writes decoded protobuf messages
/// to `channel_sender`.
pub fn new(stream: UnixStream, channel_sender: Sender<dnstap::Dnstap>) -> Self {
let stream_descr = format!("fd {}", stream.as_raw_fd());
FrameHandler {
stream,
stream_descr,
channel_sender,
count_data_bytes: 0,
count_data_frames: 0,
}
}
/// Set up the FrameStreams connection and processing the incoming data frames.
pub async fn run(&mut self) -> Result<()> {
info!(
"Accepted new Frame Streams connection on {}",
self.stream_descr
);
// Initialize the FrameStreams codec on the connected stream.
let mut framed = Framed::with_capacity(
&mut self.stream,
FrameStreamsCodec {},
framestreams_codec::FRAME_LENGTH_MAX,
);
// Process each frame from the connection.
while let Some(frame) = framed.next().await {
match frame {
Ok(frame) => {
match frame {
Frame::ControlReady(payload) => {
// Ready: This is the first control frame received from the sender.
// Send the Accept control frame.
//
// XXX: We mirror the content type(s) specified in the Ready control
// frame payload into the Accept control frame payload. Instead we
// should select a specific content type from the sender's list.
framed.send(Frame::ControlAccept(payload)).await?;
}
Frame::ControlAccept(_) => {
// Accept: This is the control frame that the receiver sends in
// response to the Ready frame. It is a protocol violation for a sender
// to send an Accept control frame.
bail!(
"{}: Protocol error: Sender sent ACCEPT frame",
self.stream_descr
);
}
Frame::ControlStart(payload) => {
// Start: This is the control frame that the sender sends in response
// to the Accept frame and indicates it will begin sending data frames
// of the type specified in the Start control frame payload.
//
// XXX: We should probably do something with the content type that the
// sender specifies in the Start control frame payload.
trace!(
"{}: START payload: {}",
self.stream_descr,
hex::encode(&payload)
);
}
Frame::ControlStop => {
// Stop: This is the control frame that the sender sends when it is
// done sending Data frames. Send the Finish frame acknowledging
// shutdown of the stream.
info!(
"{}: STOP received, processed {} data frames, {} data bytes",
self.stream_descr, self.count_data_frames, self.count_data_bytes,
);
framed.send(Frame::ControlFinish).await?;
// Shut the [`FrameHandler`] down.
return Ok(());
}
Frame::ControlFinish => {
// Protocol violation for a receiver to receive a Finish control frame.
bail!(
"{}: Protocol error: Sender sent FINISH frame",
self.stream_descr
);
}
Frame::ControlUnknown(_) => {
bail!(
"{}: Protocol error: Sender sent unknown control frame",
self.stream_descr
);
}
Frame::Data(mut payload) => {
// Accounting.
crate::metrics::DATA_FRAMES.inc();
crate::metrics::DATA_BYTES.inc_by(payload.len() as u64);
self.count_data_bytes += payload.len();
self.count_data_frames += 1;
// Decode the protobuf message.
match dnstap::Dnstap::decode(&mut payload) {
// The message was successfully parsed, send it to a
// [`DnstapHandler`] for further processing.
Ok(d) => match self.channel_sender.send(d).await {
Ok(_) => {}
Err(e) => {
bail!("{}: Unable to send dnstap protobuf message to channel: {}",
self.stream_descr, e);
}
},
// The payload failed to parse.
Err(e) => {
bail!(
"{}: Protocol error: Decoding dnstap protobuf message: {}, payload: {}",
self.stream_descr,
e,
hex::encode(&payload)
);
}
}
}
}
}
Err(e) => {
bail!("{}: Protocol error: {}", self.stream_descr, e);
}
}
}
Ok(())
}
}