1#![warn(missing_docs)]
2mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61 H2ConnectionState,
62 StreamId,
63 TimestampNs,
64 is_http2_preface,
65 looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69pub const MAX_BUF_SIZE: usize = 16384;
71
72pub struct Collator<E: DataEvent> {
78 connections: DashMap<u64, Conn>,
80 ssl_connections: DashMap<u32, Conn>,
82 config: CollatorConfig,
84 _phantom: PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl<E: DataEvent> Collator<E> {
95 pub fn new() -> Self {
98 Self::with_config(CollatorConfig::default())
99 }
100
101 pub fn with_config(config: CollatorConfig) -> Self {
103 Self {
104 connections: DashMap::new(),
105 ssl_connections: DashMap::new(),
106 config,
107 _phantom: PhantomData,
108 }
109 }
110
111 pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113 Self::with_config(CollatorConfig {
114 max_buf_size,
115 ..Default::default()
116 })
117 }
118
119 pub fn config(&self) -> &CollatorConfig {
121 &self.config
122 }
123
124 pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134 let direction = event.direction();
136 let timestamp_ns = TimestampNs(event.timestamp_ns());
137 let conn_id = event.connection_id();
138 let process_id = event.process_id();
139 let remote_port = event.remote_port();
140 let is_empty = event.payload().is_empty();
141
142 if is_empty {
143 return Vec::new();
144 }
145
146 if direction == Direction::Other {
148 return Vec::new();
149 }
150
151 let data = event.into_payload();
154
155 let chunk = DataChunk {
156 data,
157 timestamp_ns,
158 direction,
159 };
160
161 if conn_id != 0 {
164 let mut conn = self
165 .connections
166 .entry(conn_id)
167 .or_insert_with(|| Conn::new(process_id, remote_port));
168 Self::process_event_for_conn(
169 &mut conn,
170 chunk,
171 direction,
172 timestamp_ns,
173 remote_port,
174 conn_id,
175 process_id,
176 &self.config,
177 )
178 } else {
179 let mut conn = self
180 .ssl_connections
181 .entry(process_id)
182 .or_insert_with(|| Conn::new(process_id, remote_port));
183 Self::process_event_for_conn(
184 &mut conn,
185 chunk,
186 direction,
187 timestamp_ns,
188 remote_port,
189 conn_id,
190 process_id,
191 &self.config,
192 )
193 }
194 }
195
196 #[allow(clippy::too_many_arguments)]
199 fn process_event_for_conn(
200 conn: &mut Conn,
201 chunk: DataChunk,
202 direction: Direction,
203 timestamp_ns: TimestampNs,
204 remote_port: u16,
205 conn_id: u64,
206 process_id: u32,
207 config: &CollatorConfig,
208 ) -> Vec<CollationEvent> {
209 let buf: &[u8] = &chunk.data;
210
211 conn.last_activity_ns = timestamp_ns;
212 if remote_port != 0 && conn.remote_port.is_none() {
213 conn.remote_port = Some(remote_port);
214 }
215
216 if conn.protocol == Protocol::Unknown {
218 conn.protocol = detect_protocol(buf);
219 }
220
221 if conn.protocol != Protocol::Unknown {
223 let incoming_protocol = detect_protocol(buf);
224 if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225 reset_connection_for_protocol_change(conn, incoming_protocol);
226 }
227 }
228
229 match direction {
231 Direction::Write => {
232 conn.request_body_size += buf.len();
233 if conn.request_body_size > config.max_body_size {
234 reset_connection_body_limit(conn);
235 return Vec::new();
236 }
237
238 if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
240 conn.h1_write_buffer.extend_from_slice(buf);
241 }
242 conn.request_chunks.push(chunk);
243
244 match conn.protocol {
245 Protocol::Http1 if !conn.h1_write_parsed => {
246 try_parse_http1_write_chunks(conn);
247 },
248 Protocol::Http2 => {
249 parse_http2_chunks(conn, direction);
250 },
251 Protocol::Unknown => {
254 try_parse_http1_unknown_write(conn);
255 },
256 _ => {},
257 }
258
259 if is_request_complete(conn) {
262 conn.request_complete = true;
263 }
264 if is_response_complete(conn) {
265 conn.response_complete = true;
266 }
267 },
268 Direction::Read => {
269 conn.response_body_size += buf.len();
270 if conn.response_body_size > config.max_body_size {
271 reset_connection_body_limit(conn);
272 return Vec::new();
273 }
274
275 if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
277 conn.h1_read_buffer.extend_from_slice(buf);
278 }
279 conn.response_chunks.push(chunk);
280
281 match conn.protocol {
282 Protocol::Http1 if !conn.h1_read_parsed => {
283 try_parse_http1_read_chunks(conn);
284 },
285 Protocol::Http2 => {
286 parse_http2_chunks(conn, direction);
287 },
288 Protocol::Unknown => {
291 try_parse_http1_unknown_read(conn);
292 },
293 _ => {},
294 }
295
296 if is_request_complete(conn) {
299 conn.request_complete = true;
300 }
301 if is_response_complete(conn) {
302 conn.response_complete = true;
303 }
304 },
305 Direction::Other => {
306 return Vec::new();
307 },
308 }
309
310 if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
312 conn.request_complete = true;
313 conn.response_complete = true;
314 }
315
316 let mut events = Vec::new();
317
318 if config.emit_messages {
319 emit_message_events(conn, conn_id, process_id, &mut events);
320 }
321
322 if config.emit_exchanges && conn.request_complete && conn.response_complete {
323 if let Some(exchange) = build_exchange(conn) {
324 events.push(CollationEvent::Exchange(exchange));
325 }
326 reset_connection_after_exchange(conn);
327 }
328
329 events
330 }
331
332 pub fn cleanup(&self, current_time_ns: TimestampNs) {
337 self.connections.retain(|_, conn| {
338 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
339 });
340 self.ssl_connections.retain(|_, conn| {
341 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
342 });
343
344 for mut entry in self.connections.iter_mut() {
346 entry.h2_write_state.evict_stale_streams(current_time_ns);
347 entry.h2_read_state.evict_stale_streams(current_time_ns);
348 }
349 for mut entry in self.ssl_connections.iter_mut() {
350 entry.h2_write_state.evict_stale_streams(current_time_ns);
351 entry.h2_read_state.evict_stale_streams(current_time_ns);
352 }
353 }
354
355 pub fn remove_connection(&self, connection_id: u64, process_id: u32) {
360 if connection_id != 0 {
361 self.connections.remove(&connection_id);
362 } else {
363 self.ssl_connections.remove(&process_id);
364 }
365 }
366
367 pub fn close_connection(&self, connection_id: u64, process_id: u32) -> Vec<CollationEvent> {
375 let events = if connection_id != 0 {
376 match self.connections.get_mut(&connection_id) {
377 Some(mut guard) => {
378 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
379 },
380 None => Vec::new(),
381 }
382 } else {
383 match self.ssl_connections.get_mut(&process_id) {
384 Some(mut guard) => {
385 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
386 },
387 None => Vec::new(),
388 }
389 };
390
391 if connection_id != 0 {
393 self.connections.remove(&connection_id);
394 } else {
395 self.ssl_connections.remove(&process_id);
396 }
397
398 events
399 }
400}
401
402fn finalize_and_emit(
404 conn: &mut Conn,
405 connection_id: u64,
406 process_id: u32,
407 config: &CollatorConfig,
408) -> Vec<CollationEvent> {
409 if conn.protocol == Protocol::Http1
411 && conn.h1_response.is_none()
412 && !conn.h1_read_buffer.is_empty()
413 {
414 let timestamp = conn
415 .response_chunks
416 .first()
417 .map(|c| c.timestamp_ns)
418 .unwrap_or(TimestampNs(0));
419 conn.h1_response = h1::try_finalize_http1_response(&conn.h1_read_buffer, timestamp);
420 if conn.h1_response.is_some() {
421 conn.response_complete = true;
422 }
423 }
424
425 let mut events = Vec::new();
426
427 if config.emit_messages {
428 emit_message_events(conn, connection_id, process_id, &mut events);
429 }
430
431 if config.emit_exchanges
432 && conn.request_complete
433 && conn.response_complete
434 && let Some(exchange) = build_exchange(conn)
435 {
436 events.push(CollationEvent::Exchange(exchange));
437 }
438
439 events
440}
441
442fn emit_message_events(
445 conn: &mut Conn,
446 conn_id: u64,
447 process_id: u32,
448 events: &mut Vec<CollationEvent>,
449) {
450 match conn.protocol {
451 Protocol::Http1 => {
452 if let Some(ref req) = conn.h1_request
454 && !conn.h1_request_emitted
455 {
456 let metadata = MessageMetadata {
457 connection_id: conn_id,
458 process_id,
459 timestamp_ns: req.timestamp_ns,
460 stream_id: None,
461 remote_port: conn.remote_port,
462 protocol: conn.protocol,
463 };
464 events.push(CollationEvent::Message {
465 message: ParsedHttpMessage::Request(req.clone()),
466 metadata,
467 });
468 conn.h1_request_emitted = true;
469 }
470
471 if let Some(ref resp) = conn.h1_response
473 && !conn.h1_response_emitted
474 {
475 let metadata = MessageMetadata {
476 connection_id: conn_id,
477 process_id,
478 timestamp_ns: resp.timestamp_ns,
479 stream_id: None,
480 remote_port: conn.remote_port,
481 protocol: conn.protocol,
482 };
483 events.push(CollationEvent::Message {
484 message: ParsedHttpMessage::Response(resp.clone()),
485 metadata,
486 });
487 conn.h1_response_emitted = true;
488 }
489 },
490 Protocol::Http2 => {
491 for (&stream_id, msg) in &conn.pending_requests {
493 if !conn.h2_emitted_requests.contains(&stream_id)
494 && let Some(req) = msg.to_http_request()
495 {
496 let metadata = MessageMetadata {
497 connection_id: conn_id,
498 process_id,
499 timestamp_ns: msg.end_stream_timestamp_ns,
500 stream_id: Some(stream_id),
501 remote_port: conn.remote_port,
502 protocol: conn.protocol,
503 };
504 events.push(CollationEvent::Message {
505 message: ParsedHttpMessage::Request(req),
506 metadata,
507 });
508 }
509 }
510 conn.h2_emitted_requests
512 .extend(conn.pending_requests.keys().copied());
513
514 for (&stream_id, msg) in &conn.pending_responses {
516 if !conn.h2_emitted_responses.contains(&stream_id)
517 && let Some(resp) = msg.to_http_response()
518 {
519 let metadata = MessageMetadata {
520 connection_id: conn_id,
521 process_id,
522 timestamp_ns: msg.first_frame_timestamp_ns,
523 stream_id: Some(stream_id),
524 remote_port: conn.remote_port,
525 protocol: conn.protocol,
526 };
527 events.push(CollationEvent::Message {
528 message: ParsedHttpMessage::Response(resp),
529 metadata,
530 });
531 }
532 }
533 conn.h2_emitted_responses
535 .extend(conn.pending_responses.keys().copied());
536 },
537 Protocol::Unknown => {},
538 }
539}
540
541fn reset_connection_after_exchange(conn: &mut Conn) {
543 conn.request_complete = false;
544 conn.response_complete = false;
545
546 if conn.protocol == Protocol::Http1 {
547 conn.request_chunks.clear();
549 conn.response_chunks.clear();
550 conn.h1_request = None;
551 conn.h1_response = None;
552 conn.h1_write_parsed = false;
553 conn.h1_read_parsed = false;
554 conn.h1_request_emitted = false;
555 conn.h1_response_emitted = false;
556 conn.h1_write_buffer.clear();
557 conn.h1_read_buffer.clear();
558 conn.protocol = Protocol::Unknown;
559 } else if conn.protocol == Protocol::Http2 {
560 if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
566 conn.request_chunks.clear();
567 conn.response_chunks.clear();
568 conn.h2_write_state.clear_buffer();
569 conn.h2_read_state.clear_buffer();
570 }
571 }
572
573 conn.request_body_size = 0;
575 conn.response_body_size = 0;
576}
577
578fn reset_connection_body_limit(conn: &mut Conn) {
581 conn.request_chunks.clear();
582 conn.response_chunks.clear();
583 conn.h1_write_buffer.clear();
584 conn.h1_read_buffer.clear();
585 conn.h1_request = None;
586 conn.h1_response = None;
587 conn.h1_write_parsed = false;
588 conn.h1_read_parsed = false;
589 conn.h1_request_emitted = false;
590 conn.h1_response_emitted = false;
591 conn.h2_write_state = H2ConnectionState::new();
592 conn.h2_read_state = H2ConnectionState::new();
593 conn.pending_requests.clear();
594 conn.pending_responses.clear();
595 conn.h2_emitted_requests.clear();
596 conn.h2_emitted_responses.clear();
597 conn.ready_streams.clear();
598 conn.request_complete = false;
599 conn.response_complete = false;
600 conn.request_body_size = 0;
601 conn.response_body_size = 0;
602 conn.protocol = Protocol::Unknown;
603}
604
605fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
608 conn.request_chunks.clear();
609 conn.response_chunks.clear();
610 conn.h1_write_buffer.clear();
611 conn.h1_read_buffer.clear();
612 conn.h1_request = None;
613 conn.h1_response = None;
614 conn.h1_write_parsed = false;
615 conn.h1_read_parsed = false;
616 conn.h1_request_emitted = false;
617 conn.h1_response_emitted = false;
618 conn.h2_write_state = H2ConnectionState::new();
619 conn.h2_read_state = H2ConnectionState::new();
620 conn.pending_requests.clear();
621 conn.pending_responses.clear();
622 conn.h2_emitted_requests.clear();
623 conn.h2_emitted_responses.clear();
624 conn.ready_streams.clear();
625 conn.request_complete = false;
626 conn.response_complete = false;
627 conn.request_body_size = 0;
628 conn.response_body_size = 0;
629 conn.protocol = new_protocol;
630}
631
632pub fn detect_protocol(data: &[u8]) -> Protocol {
638 if is_http2_preface(data) {
640 return Protocol::Http2;
641 }
642
643 if looks_like_http2_frame(data) {
645 return Protocol::Http2;
646 }
647
648 if h1::is_http1_request(data) || h1::is_http1_response(data) {
650 return Protocol::Http1;
651 }
652
653 Protocol::Unknown
654}
655
656fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
665 let last_chunk_is_preface = match direction {
670 Direction::Write => conn
671 .request_chunks
672 .last()
673 .is_some_and(|c| is_http2_preface(&c.data)),
674 Direction::Read => conn
675 .response_chunks
676 .last()
677 .is_some_and(|c| is_http2_preface(&c.data)),
678 Direction::Other => false,
679 };
680 let current_state_has_preface = match direction {
681 Direction::Write => conn.h2_write_state.preface_received,
682 Direction::Read => conn.h2_read_state.preface_received,
683 Direction::Other => false,
684 };
685
686 if last_chunk_is_preface && current_state_has_preface {
687 conn.h2_write_state = H2ConnectionState::new();
688 conn.h2_read_state = H2ConnectionState::new();
689 conn.pending_requests.clear();
690 conn.pending_responses.clear();
691 conn.h2_emitted_requests.clear();
692 conn.h2_emitted_responses.clear();
693 conn.ready_streams.clear();
694 }
695
696 let (chunks, h2_state) = match direction {
697 Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
698 Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
699 Direction::Other => return,
700 };
701
702 let chunk = match chunks.last() {
703 Some(c) => c,
704 None => return,
705 };
706
707 let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
709
710 while let Some((stream_id, msg)) = h2_state.try_pop() {
713 if msg.is_request() {
714 conn.pending_requests.insert(stream_id, msg);
715 if conn.pending_responses.contains_key(&stream_id) {
716 conn.ready_streams.insert(stream_id);
717 }
718 } else if msg.is_response() {
719 conn.pending_responses.insert(stream_id, msg);
720 if conn.pending_requests.contains_key(&stream_id) {
721 conn.ready_streams.insert(stream_id);
722 }
723 }
724 }
725}
726
727fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
730 conn.ready_streams.iter().next().copied()
731}
732
733fn try_parse_http1_write_chunks(conn: &mut Conn) {
736 let timestamp = conn
737 .request_chunks
738 .last()
739 .map(|c| c.timestamp_ns)
740 .unwrap_or(TimestampNs(0));
741 if let Some(req) = h1::try_parse_http1_request(&conn.h1_write_buffer, timestamp) {
742 conn.h1_request = Some(req);
743 conn.h1_write_parsed = true;
744 } else if let Some(resp) = h1::try_parse_http1_response(&conn.h1_write_buffer, timestamp) {
745 conn.h1_response = Some(resp);
746 conn.h1_write_parsed = true;
747 }
748}
749
750fn try_parse_http1_read_chunks(conn: &mut Conn) {
753 let timestamp = conn
754 .response_chunks
755 .first()
756 .map(|c| c.timestamp_ns)
757 .unwrap_or(TimestampNs(0));
758 if let Some(resp) = h1::try_parse_http1_response(&conn.h1_read_buffer, timestamp) {
759 conn.h1_response = Some(resp);
760 conn.h1_read_parsed = true;
761 } else if let Some(req) = h1::try_parse_http1_request(&conn.h1_read_buffer, timestamp) {
762 conn.h1_request = Some(req);
763 conn.h1_read_parsed = true;
764 }
765}
766
767fn try_parse_http1_unknown_write(conn: &mut Conn) {
770 let timestamp = conn
771 .request_chunks
772 .last()
773 .map(|c| c.timestamp_ns)
774 .unwrap_or(TimestampNs(0));
775 if let Some(req) = h1::try_parse_http1_request(&conn.h1_write_buffer, timestamp) {
776 conn.protocol = Protocol::Http1;
777 conn.h1_request = Some(req);
778 conn.h1_write_parsed = true;
779 } else if let Some(resp) = h1::try_parse_http1_response(&conn.h1_write_buffer, timestamp) {
780 conn.protocol = Protocol::Http1;
781 conn.h1_response = Some(resp);
782 conn.h1_write_parsed = true;
783 }
784}
785
786fn try_parse_http1_unknown_read(conn: &mut Conn) {
789 let timestamp = conn
790 .response_chunks
791 .first()
792 .map(|c| c.timestamp_ns)
793 .unwrap_or(TimestampNs(0));
794 if let Some(resp) = h1::try_parse_http1_response(&conn.h1_read_buffer, timestamp) {
795 conn.protocol = Protocol::Http1;
796 conn.h1_response = Some(resp);
797 conn.h1_read_parsed = true;
798 } else if let Some(req) = h1::try_parse_http1_request(&conn.h1_read_buffer, timestamp) {
799 conn.protocol = Protocol::Http1;
800 conn.h1_request = Some(req);
801 conn.h1_read_parsed = true;
802 }
803}
804
805fn is_request_complete(conn: &Conn) -> bool {
806 match conn.protocol {
807 Protocol::Http1 => conn.h1_request.is_some(),
808 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
809 Protocol::Unknown => false,
810 }
811}
812
813fn is_response_complete(conn: &Conn) -> bool {
814 match conn.protocol {
815 Protocol::Http1 => conn.h1_response.is_some(),
816 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
817 Protocol::Unknown => false,
818 }
819}
820
821fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
822 let (request, response, stream_id, latency_ns) = match conn.protocol {
823 Protocol::Http1 => {
824 let req = conn.h1_request.take()?;
826 let resp = conn.h1_response.take()?;
827 let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
828 (req, resp, None, latency)
829 },
830 Protocol::Http2 => {
831 let sid = find_complete_h2_stream(conn)?;
832 let msg_req = conn.pending_requests.remove(&sid)?;
833 let msg_resp = conn.pending_responses.remove(&sid)?;
834
835 conn.h2_emitted_requests.remove(&sid);
837 conn.h2_emitted_responses.remove(&sid);
838 conn.ready_streams.remove(&sid);
839
840 let request_complete_time = msg_req.end_stream_timestamp_ns;
844 let response_start_time = msg_resp.first_frame_timestamp_ns;
845
846 let req = msg_req.into_http_request()?;
847 let resp = msg_resp.into_http_response()?;
848
849 let latency = response_start_time.saturating_sub(request_complete_time);
850 (req, resp, Some(sid), latency)
851 },
852 Protocol::Unknown => return None,
853 };
854
855 Some(Exchange {
856 request,
857 response,
858 latency_ns,
859 protocol: conn.protocol,
860 process_id: conn.process_id,
861 remote_port: conn.remote_port,
862 stream_id,
863 })
864}
865
866#[cfg(test)]
867mod tests;