crowdstrike_cloudproto/services/ts/socket.rs
1use crate::framing::{CloudProtoError, CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
2use crate::services::ts::event::EVT_HDR_LEN;
3use crate::services::ts::{AgentIdStatus, Event, TsConnectInfo, TsPacketKind};
4use crate::services::CloudProtoMagic;
5use futures_util::{Sink, SinkExt, Stream, StreamExt};
6use std::io::Cursor;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::{debug, error, trace, warn};
11
12const HDR_TXID_SIZE: usize = std::mem::size_of::<u64>();
13// Values observed from the official client.
14// The TS server returns large quickly incrementing TXIDs, but these values here are fine.
15const FIRST_TXID: u64 = 0x200;
16const TXID_INCREMENT: u64 = 0x100;
17
18/// Async socket used to stream [`Event`](Event)s with the TS service
19///
20/// You need to provide a valid Crowdstrike Customer ID (CID) to authenticate with the server.
21/// The TS server checks that this CID belongs to a valid customer and will immediately close the socket otherwise.
22///
23/// You should have been provided with a "CCID" when installing the Falcon Sensor,
24/// which looks something like "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-BB".
25/// The CID is the first part before the "-BB".
26///
27/// After installation, you can still find your CID in binary form in the "falconstore" file,
28/// saved as a 16 byte binary blob, right after the UTF-16 literal "CU".
29pub struct TsEventSocket<IO: AsyncRead + AsyncWrite> {
30 io: CloudProtoSocket<IO>,
31 next_txid: u64,
32
33 unacked_txid: Option<u64>,
34 unacked_event: Option<Event>,
35}
36
37impl<IO> TsEventSocket<IO>
38where
39 IO: AsyncRead + AsyncWrite,
40{
41 pub(crate) fn new(io: CloudProtoSocket<IO>) -> Self {
42 Self {
43 io,
44 next_txid: FIRST_TXID,
45 unacked_txid: None,
46 unacked_event: None,
47 }
48 }
49
50 pub async fn connect(
51 mut io: CloudProtoSocket<IO>,
52 info: TsConnectInfo,
53 ) -> Result<Self, CloudProtoError> {
54 let mut payload = Vec::with_capacity(4 * 16 + 8);
55 payload.extend_from_slice(&info.cid);
56 payload.extend_from_slice(&info.unk0);
57 payload.extend_from_slice(&info.aid);
58 payload.extend_from_slice(&info.bootid);
59 payload.extend_from_slice(&info.pt);
60 let pkt = CloudProtoPacket {
61 magic: CloudProtoMagic::TS,
62 kind: TsPacketKind::Connect.into(),
63 version: CloudProtoVersion::Connect,
64 payload,
65 };
66 io.send(pkt).await?;
67
68 let reply = match io.next().await {
69 Some(pkt) => pkt?,
70 None => {
71 return Err(CloudProtoError::ClosedByPeer(
72 "TS server closed connection".into(),
73 ))
74 }
75 };
76 // Log the connection packet for debugging, since we don't otherwise return the payload in errors
77 trace!("Received TS connect reply: {}", hex::encode(&reply.payload));
78
79 if reply.magic != CloudProtoMagic::TS {
80 return Err(CloudProtoError::BadMagic(reply.magic, CloudProtoMagic::TS));
81 }
82 if reply.kind != TsPacketKind::ConnectionEstablished {
83 error!(
84 "Bad TS connect reply kind: {:X?}, payload: {}",
85 reply,
86 hex::encode(&reply.payload)
87 );
88 return Err(CloudProtoError::WrongConnectionPacketKind(
89 reply.kind,
90 TsPacketKind::ConnectionEstablished.into(),
91 ));
92 }
93 if reply.version != CloudProtoVersion::Normal {
94 error!(
95 "Bad TS connect reply version: {:X?}, payload: {}",
96 reply,
97 hex::encode(&reply.payload)
98 );
99 return Err(CloudProtoError::BadVersion(
100 reply.version,
101 CloudProtoVersion::Normal,
102 ));
103 }
104
105 if reply.payload.len() != 17 {
106 warn!("TsEventSocket connect reply has unexpected size, continuing anyways")
107 } else if reply.payload[0] == AgentIdStatus::Unchanged as u8 {
108 debug!(
109 received_aid = hex::encode(&reply.payload[1..]),
110 "TS socket connected, AgentID unchanged",
111 );
112 if info.aid[..] != reply.payload {
113 warn!("TS server says to keep our AgentID, but replied with a different one!");
114 }
115 } else if reply.payload[0] == AgentIdStatus::Changed as u8 {
116 debug!(
117 received_aid = hex::encode(&reply.payload[1..]),
118 "TS socket connected, AgentID has changed",
119 );
120 if info.aid[..] == reply.payload {
121 warn!("TS server says to change our AgentID, but replied with the same one!");
122 }
123 } else {
124 warn!(
125 "Unexpected value from TS server when checking whether the AgentID changed: {:#x}",
126 reply.payload[0]
127 )
128 }
129
130 Ok(Self::new(io))
131 }
132}
133
134impl<IO> Stream for TsEventSocket<IO>
135where
136 IO: AsyncRead + AsyncWrite,
137{
138 type Item = Result<Event, CloudProtoError>;
139
140 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 let this = self.get_mut();
142
143 // (Shh, don't tell anyone, but this is a stealth goto we take just once after receiving an event!)
144 'process_pending_acks: loop {
145 if let Some(txid) = &this.unacked_txid {
146 assert!(this.unacked_event.is_some());
147 ready!(this.io.poll_ready_unpin(cx))?;
148
149 this.io.start_send_unpin(CloudProtoPacket {
150 magic: CloudProtoMagic::TS,
151 kind: TsPacketKind::Ack.into(),
152 version: CloudProtoVersion::Normal,
153 payload: txid.to_be_bytes().to_vec(),
154 })?;
155 let _ = this.unacked_txid.take();
156
157 // If the ACK doesn't finish leaving here, that's fine,
158 // we also flush below when our io's recv side is still Pending
159 ready!(this.io.poll_flush_unpin(cx))?;
160 }
161 if let Some(ev) = this.unacked_event.take() {
162 assert!(this.unacked_txid.is_none());
163 return Poll::Ready(Some(Ok(ev)));
164 }
165
166 '_receive_packets: loop {
167 let pkt = match this.io.poll_next_unpin(cx)? {
168 Poll::Ready(Some(pkt)) => pkt,
169 Poll::Ready(None) => return Poll::Ready(None),
170 Poll::Pending => {
171 // If the user is only polling the read side, some of our ACKs might never finish flushing,
172 // the other server would stop sending, and this poll_next would be Pending forever :)
173 // So if we have nothing left but the user is still reading, it's a good time to flush our send side
174 ready!(this.io.poll_flush_unpin(cx))?;
175 return Poll::Pending; // We still have a queued wake on the read side
176 }
177 };
178
179 if pkt.kind == TsPacketKind::Ack {
180 // This would be the place to update a queue of un-ACKed inflight packets,
181 // so we can have backpressure, and retransmits packets after some time.
182 //
183 // We don't do any of that, because Crowdstrike's client doesn't either,
184 // and it's unreasonably hard to be the only side "following TCP rules"
185 // if the other side assumes packets it sends can never be dropped.
186 //
187 // See the other (large) comment below on the send side for more context.
188 if pkt.payload.len() == 8 {
189 let txid = u64::from_be_bytes(pkt.payload[..].try_into().unwrap());
190 trace!("Received ACK for event txid {:#x}", txid);
191 } else {
192 error!(
193 "Received ACK packet with invalid size: {:#x}",
194 pkt.payload.len()
195 )
196 }
197 continue;
198 } else if pkt.kind == TsPacketKind::Event {
199 if pkt.payload.len() < HDR_TXID_SIZE + EVT_HDR_LEN {
200 return Poll::Ready(Some(Err(CloudProtoError::PayloadTooShort(
201 pkt.payload.len(),
202 HDR_TXID_SIZE + EVT_HDR_LEN,
203 ))));
204 }
205 let txid = u64::from_be_bytes(pkt.payload[..HDR_TXID_SIZE].try_into().unwrap());
206 let ev = Event::from_read(&mut Cursor::new(&pkt.payload[HDR_TXID_SIZE..]))?;
207
208 // We ACK received events before returning them, to make sure we keep getting polled until the ACK is sent
209 // So we have to buffer the event and its txid, in case we get Poll::Pending while trying to ACK it
210 trace!(
211 "Received event with txid {:#x}, preparing to send ACK",
212 txid
213 );
214 assert!(this.unacked_txid.is_none());
215 this.unacked_txid = Some(txid);
216 assert!(this.unacked_event.is_none());
217 this.unacked_event = Some(ev);
218 continue 'process_pending_acks;
219 } else {
220 // Hoping this was a non-essential packet and continuing happily...
221 warn!(
222 "Received unexpected CloudProto packet kind: {:#x}",
223 pkt.kind
224 );
225 trace!("Unexpected packet payload: {}", hex::encode(&pkt.payload));
226 }
227 }
228 }
229 }
230}
231
232impl<IO> Sink<Event> for TsEventSocket<IO>
233where
234 IO: AsyncRead + AsyncWrite,
235{
236 type Error = std::io::Error;
237
238 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239 // If we wanted to tracked ACKs for our tx, here we would need to block when the
240 // queue of inflight un-ACKed events we're trackign becomes full.
241 // But that queue can only shrink when we *receive* ACKs, so the TX side would depend
242 // on Rust users of the lib also polling the RX side regularly, *while* they send.
243 // If a user did some ts_sock.send().await in a loop without ever receiving, we'd DEADLOCK.
244 //
245 // So we could poll the RX internally from TX when our send queue is full until we get ACKs
246 // But it's a single RX stream. We might receive real Events instead, all while inside send.
247 // So we could stash them in an RX queue until full, but then we what should send() do?
248 //
249 // An unsatisfying option is to just return an error from start_send() when that happens,
250 // because your RX queue is full, maybe no one's polling RX, and we don't want to deadlock.
251 // But that error could also happen in normal usage if the TX races faster than the RX.
252 // And in Rust, Sinks that return error are generally expected to be permanently closed.
253 // Rust sinks don't normally do "oops you need to poll the read side a little!" errors :)
254 //
255 // This is where TX would just drop packets on the floor when the RX queue is full,
256 // and that ought to be fine! If we drop them they won't be ACKd, and the other side
257 // will just retransmit them to us later. A little bit of inefficiency on the RX side,
258 // but this only happens when the RX queue is full, so that'd be reasonable.
259 //
260 // Except, as it turns out, the official Crowdstrike client does **none of that**!
261 //
262 // It *looks like* it has code to do it! It *seems* to track inflight packets,
263 // walks all sorts of linked lists to process inflight packets and the ACKs for them,
264 // its send worker thread has some kind of "send window" and "backpressure" like routines.
265 //
266 // But in practice, it completely ignores ACKs. It always just keeps forging ahead,
267 // so we can't rely on being able to drop packets when our RX queue is full, they'd be lost.
268 // This is the problem with being the only side trying to uphold guarantees, you only
269 // get the constraints but you don't get to rely on the other side following them ^^'
270 //
271 // This is still work-around-able, either by telling users to always split() this socket
272 // and always poll the RX side in an async task, so we get to just block in TX and be done.
273 // This is pretty much how Crowdstrike's client is architectured too.
274 // It has an RX and TX worker, and reality aside, in theory it should just work like this.
275 //
276 // Or, we could do "engineering" and add a 100ms timer since the last RX poll.
277 // TX would only try to go receive ACKs itself after 100ms pass without any RX poll,
278 // it'd put Events in the RX queue until full, and return an error when that's full.
279 // Because of the timer this can't happen in normal usage anymore.
280 // With this, if you are polling RX from time to time TX will just wait for you.
281 // If you're only sending but you know you won't be received packets, it
282 // TX *will* have to do reads, but it will see only ACKs, won't have to fill the RX queue,
283 // and so you will not see the error in practice.
284 // Only if you keep sending without having an RX worker, and the other side actually
285 // replies with Events, then we'd return the error, because that's still better
286 // than a deadlock or just dropping received packets that the other side won't retransmit.
287 //
288 //
289 // But, instead of all that, we just do as the romans do and happily ignore received ACKs.
290 // Firstly because it's completely unnecessary, the Crowdstrike server already has to
291 // deal with a client that doesn't follow ACKs at all, so we're "bug-for-bug" compatible.
292 //
293 // Second because CLOUDPROTO is *always* carried over TLS, so ACKs were never necessary
294 // in the first place! Even if you do TLS over UDP for some reason, you will already have
295 // done retransmissions below the TLS layer, because TLS won't just let you drop packets in
296 // the middle of an encrypted stream. The crypto layer tends to not like that idea.
297 //
298 // It's interesting that their client _almost_ implements all of this machinery,
299 // but then gives up in practice. Maybe it was just too complex and/or annoying to debug?
300 // I'm just curious why they still ship all of this code that doesn't run in practice...
301 // For instance, the client has a check where if a function returns some status code,
302 // that means a duplicate ACK was received. Except that status code is _never_ returned
303 // by that function, in fact it's found nowhere else in the code accoring to IDA :)
304 //
305 // A lot of the client code is like this, half implemented stuff. But maybe we should
306 // really be impressed by this surely purposeful obfuscation and misdirection.
307 // (...almost as effective as having to follow all those damn C++ virtual calls everywhere!)
308 let this = self.get_mut();
309 this.io.poll_ready_unpin(cx)
310 }
311
312 fn start_send(self: Pin<&mut Self>, ev: Event) -> Result<(), Self::Error> {
313 let this = self.get_mut();
314
315 let mut buf = Vec::with_capacity(HDR_TXID_SIZE + EVT_HDR_LEN + ev.data.len());
316 buf.extend_from_slice(&this.next_txid.to_be_bytes());
317 this.next_txid += TXID_INCREMENT;
318 match ev.into_write(&mut buf) {
319 Ok(_) => {}
320 Err(CloudProtoError::Io { source }) => return Err(source),
321 Err(e) => {
322 return Err(std::io::Error::new(
323 std::io::ErrorKind::Other,
324 format!("Unexpected error while sending Event: {}", e),
325 ))
326 }
327 }
328
329 this.io.start_send_unpin(CloudProtoPacket {
330 magic: CloudProtoMagic::TS,
331 kind: TsPacketKind::Event.into(),
332 version: CloudProtoVersion::Normal,
333 payload: buf,
334 })
335 }
336
337 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
338 self.get_mut().io.poll_flush_unpin(cx)
339 }
340
341 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
342 self.get_mut().io.poll_close_unpin(cx)
343 }
344}