1#![warn(missing_docs)]
2mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61 H2ConnectionState,
62 StreamId,
63 TimestampNs,
64 is_http2_preface,
65 looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69pub const MAX_BUF_SIZE: usize = 16384;
71
72pub struct Collator<E: DataEvent> {
78 connections: DashMap<u64, Conn>,
80 ssl_connections: DashMap<u32, Conn>,
82 config: CollatorConfig,
84 _phantom: PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl<E: DataEvent> Collator<E> {
95 pub fn new() -> Self {
98 Self::with_config(CollatorConfig::default())
99 }
100
101 pub fn with_config(config: CollatorConfig) -> Self {
103 Self {
104 connections: DashMap::new(),
105 ssl_connections: DashMap::new(),
106 config,
107 _phantom: PhantomData,
108 }
109 }
110
111 pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113 Self::with_config(CollatorConfig {
114 max_buf_size,
115 ..Default::default()
116 })
117 }
118
119 pub fn config(&self) -> &CollatorConfig {
121 &self.config
122 }
123
124 pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134 let direction = event.direction();
136 let timestamp_ns = TimestampNs(event.timestamp_ns());
137 let conn_id = event.connection_id();
138 let process_id = event.process_id();
139 let remote_port = event.remote_port();
140 let is_empty = event.payload().is_empty();
141
142 if is_empty {
143 return Vec::new();
144 }
145
146 if direction == Direction::Other {
148 return Vec::new();
149 }
150
151 let data = event.into_payload();
154
155 let chunk = DataChunk {
156 data,
157 timestamp_ns,
158 direction,
159 };
160
161 if conn_id != 0 {
164 let mut conn = self
165 .connections
166 .entry(conn_id)
167 .or_insert_with(|| Conn::new(process_id, remote_port));
168 Self::process_event_for_conn(
169 &mut conn,
170 chunk,
171 direction,
172 timestamp_ns,
173 remote_port,
174 conn_id,
175 process_id,
176 &self.config,
177 )
178 } else {
179 let mut conn = self
180 .ssl_connections
181 .entry(process_id)
182 .or_insert_with(|| Conn::new(process_id, remote_port));
183 Self::process_event_for_conn(
184 &mut conn,
185 chunk,
186 direction,
187 timestamp_ns,
188 remote_port,
189 conn_id,
190 process_id,
191 &self.config,
192 )
193 }
194 }
195
196 #[allow(clippy::too_many_arguments)]
199 fn process_event_for_conn(
200 conn: &mut Conn,
201 chunk: DataChunk,
202 direction: Direction,
203 timestamp_ns: TimestampNs,
204 remote_port: u16,
205 conn_id: u64,
206 process_id: u32,
207 config: &CollatorConfig,
208 ) -> Vec<CollationEvent> {
209 let buf: &[u8] = &chunk.data;
210
211 conn.last_activity_ns = timestamp_ns;
212 if remote_port != 0 && conn.remote_port.is_none() {
213 conn.remote_port = Some(remote_port);
214 }
215
216 if conn.protocol == Protocol::Unknown {
218 conn.protocol = detect_protocol(buf);
219 }
220
221 if conn.protocol != Protocol::Unknown {
223 let incoming_protocol = detect_protocol(buf);
224 if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225 reset_connection_for_protocol_change(conn, incoming_protocol);
226 }
227 }
228
229 match direction {
231 Direction::Write => {
232 conn.request_body_size += buf.len();
233 if conn.request_body_size > config.max_body_size {
234 reset_connection_body_limit(conn);
235 return Vec::new();
236 }
237
238 if conn.protocol == Protocol::Http1 {
239 conn.h1_request_buffer.extend_from_slice(buf);
240 }
241 conn.request_chunks.push(chunk);
242
243 match conn.protocol {
244 Protocol::Http1 if conn.h1_request.is_none() => {
245 try_parse_http1_request_chunks(conn);
246 },
247 Protocol::Http2 => {
248 parse_http2_chunks(conn, direction);
249 },
250 _ => {},
251 }
252
253 if is_request_complete(conn) {
254 conn.request_complete = true;
255 }
256 },
257 Direction::Read => {
258 conn.response_body_size += buf.len();
259 if conn.response_body_size > config.max_body_size {
260 reset_connection_body_limit(conn);
261 return Vec::new();
262 }
263
264 if conn.protocol == Protocol::Http1 {
265 conn.h1_response_buffer.extend_from_slice(buf);
266 }
267 conn.response_chunks.push(chunk);
268
269 match conn.protocol {
270 Protocol::Http1 if conn.h1_response.is_none() => {
271 try_parse_http1_response_chunks(conn);
272 },
273 Protocol::Http2 => {
274 parse_http2_chunks(conn, direction);
275 },
276 _ => {},
277 }
278
279 if is_response_complete(conn) {
280 conn.response_complete = true;
281 }
282 },
283 Direction::Other => {
284 return Vec::new();
285 },
286 }
287
288 if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
290 conn.request_complete = true;
291 conn.response_complete = true;
292 }
293
294 let mut events = Vec::new();
295
296 if config.emit_messages {
297 emit_message_events(conn, conn_id, process_id, &mut events);
298 }
299
300 if config.emit_exchanges && conn.request_complete && conn.response_complete {
301 if let Some(exchange) = build_exchange(conn) {
302 events.push(CollationEvent::Exchange(exchange));
303 }
304 reset_connection_after_exchange(conn);
305 }
306
307 events
308 }
309
310 pub fn cleanup(&self, current_time_ns: TimestampNs) {
315 self.connections.retain(|_, conn| {
316 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
317 });
318 self.ssl_connections.retain(|_, conn| {
319 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
320 });
321
322 for mut entry in self.connections.iter_mut() {
324 entry.h2_write_state.evict_stale_streams(current_time_ns);
325 entry.h2_read_state.evict_stale_streams(current_time_ns);
326 }
327 for mut entry in self.ssl_connections.iter_mut() {
328 entry.h2_write_state.evict_stale_streams(current_time_ns);
329 entry.h2_read_state.evict_stale_streams(current_time_ns);
330 }
331 }
332
333 pub fn remove_connection(&self, connection_id: u64, process_id: u32) {
338 if connection_id != 0 {
339 self.connections.remove(&connection_id);
340 } else {
341 self.ssl_connections.remove(&process_id);
342 }
343 }
344
345 pub fn close_connection(&self, connection_id: u64, process_id: u32) -> Vec<CollationEvent> {
353 let events = if connection_id != 0 {
354 match self.connections.get_mut(&connection_id) {
355 Some(mut guard) => {
356 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
357 },
358 None => Vec::new(),
359 }
360 } else {
361 match self.ssl_connections.get_mut(&process_id) {
362 Some(mut guard) => {
363 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
364 },
365 None => Vec::new(),
366 }
367 };
368
369 if connection_id != 0 {
371 self.connections.remove(&connection_id);
372 } else {
373 self.ssl_connections.remove(&process_id);
374 }
375
376 events
377 }
378}
379
380fn finalize_and_emit(
382 conn: &mut Conn,
383 connection_id: u64,
384 process_id: u32,
385 config: &CollatorConfig,
386) -> Vec<CollationEvent> {
387 if conn.protocol == Protocol::Http1
389 && conn.h1_response.is_none()
390 && !conn.h1_response_buffer.is_empty()
391 {
392 let timestamp = conn
393 .response_chunks
394 .first()
395 .map(|c| c.timestamp_ns)
396 .unwrap_or(TimestampNs(0));
397 conn.h1_response = h1::try_finalize_http1_response(&conn.h1_response_buffer, timestamp);
398 if conn.h1_response.is_some() {
399 conn.response_complete = true;
400 }
401 }
402
403 let mut events = Vec::new();
404
405 if config.emit_messages {
406 emit_message_events(conn, connection_id, process_id, &mut events);
407 }
408
409 if config.emit_exchanges
410 && conn.request_complete
411 && conn.response_complete
412 && let Some(exchange) = build_exchange(conn)
413 {
414 events.push(CollationEvent::Exchange(exchange));
415 }
416
417 events
418}
419
420fn emit_message_events(
423 conn: &mut Conn,
424 conn_id: u64,
425 process_id: u32,
426 events: &mut Vec<CollationEvent>,
427) {
428 match conn.protocol {
429 Protocol::Http1 => {
430 if let Some(ref req) = conn.h1_request
432 && !conn.h1_request_emitted
433 {
434 let metadata = MessageMetadata {
435 connection_id: conn_id,
436 process_id,
437 timestamp_ns: req.timestamp_ns,
438 stream_id: None,
439 remote_port: conn.remote_port,
440 protocol: conn.protocol,
441 };
442 events.push(CollationEvent::Message {
443 message: ParsedHttpMessage::Request(req.clone()),
444 metadata,
445 });
446 conn.h1_request_emitted = true;
447 }
448
449 if let Some(ref resp) = conn.h1_response
451 && !conn.h1_response_emitted
452 {
453 let metadata = MessageMetadata {
454 connection_id: conn_id,
455 process_id,
456 timestamp_ns: resp.timestamp_ns,
457 stream_id: None,
458 remote_port: conn.remote_port,
459 protocol: conn.protocol,
460 };
461 events.push(CollationEvent::Message {
462 message: ParsedHttpMessage::Response(resp.clone()),
463 metadata,
464 });
465 conn.h1_response_emitted = true;
466 }
467 },
468 Protocol::Http2 => {
469 for (&stream_id, msg) in &conn.pending_requests {
471 if !conn.h2_emitted_requests.contains(&stream_id)
472 && let Some(req) = msg.to_http_request()
473 {
474 let metadata = MessageMetadata {
475 connection_id: conn_id,
476 process_id,
477 timestamp_ns: msg.end_stream_timestamp_ns,
478 stream_id: Some(stream_id),
479 remote_port: conn.remote_port,
480 protocol: conn.protocol,
481 };
482 events.push(CollationEvent::Message {
483 message: ParsedHttpMessage::Request(req),
484 metadata,
485 });
486 }
487 }
488 conn.h2_emitted_requests
490 .extend(conn.pending_requests.keys().copied());
491
492 for (&stream_id, msg) in &conn.pending_responses {
494 if !conn.h2_emitted_responses.contains(&stream_id)
495 && let Some(resp) = msg.to_http_response()
496 {
497 let metadata = MessageMetadata {
498 connection_id: conn_id,
499 process_id,
500 timestamp_ns: msg.first_frame_timestamp_ns,
501 stream_id: Some(stream_id),
502 remote_port: conn.remote_port,
503 protocol: conn.protocol,
504 };
505 events.push(CollationEvent::Message {
506 message: ParsedHttpMessage::Response(resp),
507 metadata,
508 });
509 }
510 }
511 conn.h2_emitted_responses
513 .extend(conn.pending_responses.keys().copied());
514 },
515 Protocol::Unknown => {},
516 }
517}
518
519fn reset_connection_after_exchange(conn: &mut Conn) {
521 conn.request_complete = false;
522 conn.response_complete = false;
523
524 if conn.protocol == Protocol::Http1 {
525 conn.request_chunks.clear();
527 conn.response_chunks.clear();
528 conn.h1_request = None;
529 conn.h1_response = None;
530 conn.h1_request_emitted = false;
531 conn.h1_response_emitted = false;
532 conn.h1_request_buffer.clear();
533 conn.h1_response_buffer.clear();
534 conn.protocol = Protocol::Unknown;
535 } else if conn.protocol == Protocol::Http2 {
536 if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
542 conn.request_chunks.clear();
543 conn.response_chunks.clear();
544 conn.h2_write_state.clear_buffer();
545 conn.h2_read_state.clear_buffer();
546 }
547 }
548
549 conn.request_body_size = 0;
551 conn.response_body_size = 0;
552}
553
554fn reset_connection_body_limit(conn: &mut Conn) {
557 conn.request_chunks.clear();
558 conn.response_chunks.clear();
559 conn.h1_request_buffer.clear();
560 conn.h1_response_buffer.clear();
561 conn.h1_request = None;
562 conn.h1_response = None;
563 conn.h1_request_emitted = false;
564 conn.h1_response_emitted = false;
565 conn.h2_write_state = H2ConnectionState::new();
566 conn.h2_read_state = H2ConnectionState::new();
567 conn.pending_requests.clear();
568 conn.pending_responses.clear();
569 conn.h2_emitted_requests.clear();
570 conn.h2_emitted_responses.clear();
571 conn.ready_streams.clear();
572 conn.request_complete = false;
573 conn.response_complete = false;
574 conn.request_body_size = 0;
575 conn.response_body_size = 0;
576 conn.protocol = Protocol::Unknown;
577}
578
579fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
582 conn.request_chunks.clear();
583 conn.response_chunks.clear();
584 conn.h1_request_buffer.clear();
585 conn.h1_response_buffer.clear();
586 conn.h1_request = None;
587 conn.h1_response = None;
588 conn.h1_request_emitted = false;
589 conn.h1_response_emitted = false;
590 conn.h2_write_state = H2ConnectionState::new();
591 conn.h2_read_state = H2ConnectionState::new();
592 conn.pending_requests.clear();
593 conn.pending_responses.clear();
594 conn.h2_emitted_requests.clear();
595 conn.h2_emitted_responses.clear();
596 conn.ready_streams.clear();
597 conn.request_complete = false;
598 conn.response_complete = false;
599 conn.request_body_size = 0;
600 conn.response_body_size = 0;
601 conn.protocol = new_protocol;
602}
603
604pub fn detect_protocol(data: &[u8]) -> Protocol {
610 if is_http2_preface(data) {
612 return Protocol::Http2;
613 }
614
615 if looks_like_http2_frame(data) {
617 return Protocol::Http2;
618 }
619
620 if h1::is_http1_request(data) || h1::is_http1_response(data) {
622 return Protocol::Http1;
623 }
624
625 Protocol::Unknown
626}
627
628fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
637 let last_chunk_is_preface = match direction {
642 Direction::Write => conn
643 .request_chunks
644 .last()
645 .is_some_and(|c| is_http2_preface(&c.data)),
646 Direction::Read => conn
647 .response_chunks
648 .last()
649 .is_some_and(|c| is_http2_preface(&c.data)),
650 Direction::Other => false,
651 };
652 let current_state_has_preface = match direction {
653 Direction::Write => conn.h2_write_state.preface_received,
654 Direction::Read => conn.h2_read_state.preface_received,
655 Direction::Other => false,
656 };
657
658 if last_chunk_is_preface && current_state_has_preface {
659 conn.h2_write_state = H2ConnectionState::new();
660 conn.h2_read_state = H2ConnectionState::new();
661 conn.pending_requests.clear();
662 conn.pending_responses.clear();
663 conn.h2_emitted_requests.clear();
664 conn.h2_emitted_responses.clear();
665 conn.ready_streams.clear();
666 }
667
668 let (chunks, h2_state) = match direction {
669 Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
670 Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
671 Direction::Other => return,
672 };
673
674 let chunk = match chunks.last() {
675 Some(c) => c,
676 None => return,
677 };
678
679 let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
681
682 while let Some((stream_id, msg)) = h2_state.try_pop() {
685 if msg.is_request() {
686 conn.pending_requests.insert(stream_id, msg);
687 if conn.pending_responses.contains_key(&stream_id) {
688 conn.ready_streams.insert(stream_id);
689 }
690 } else if msg.is_response() {
691 conn.pending_responses.insert(stream_id, msg);
692 if conn.pending_requests.contains_key(&stream_id) {
693 conn.ready_streams.insert(stream_id);
694 }
695 }
696 }
697}
698
699fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
702 conn.ready_streams.iter().next().copied()
703}
704
705fn try_parse_http1_request_chunks(conn: &mut Conn) {
708 let timestamp = conn
709 .request_chunks
710 .last()
711 .map(|c| c.timestamp_ns)
712 .unwrap_or(TimestampNs(0));
713 conn.h1_request = h1::try_parse_http1_request(&conn.h1_request_buffer, timestamp);
714}
715
716fn try_parse_http1_response_chunks(conn: &mut Conn) {
719 let timestamp = conn
720 .response_chunks
721 .first()
722 .map(|c| c.timestamp_ns)
723 .unwrap_or(TimestampNs(0));
724 conn.h1_response = h1::try_parse_http1_response(&conn.h1_response_buffer, timestamp);
725}
726
727fn is_request_complete(conn: &Conn) -> bool {
728 match conn.protocol {
729 Protocol::Http1 => conn.h1_request.is_some(),
730 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
731 Protocol::Unknown => false,
732 }
733}
734
735fn is_response_complete(conn: &Conn) -> bool {
736 match conn.protocol {
737 Protocol::Http1 => conn.h1_response.is_some(),
738 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
739 Protocol::Unknown => false,
740 }
741}
742
743fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
744 let (request, response, stream_id, latency_ns) = match conn.protocol {
745 Protocol::Http1 => {
746 let req = conn.h1_request.take()?;
748 let resp = conn.h1_response.take()?;
749 let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
750 (req, resp, None, latency)
751 },
752 Protocol::Http2 => {
753 let sid = find_complete_h2_stream(conn)?;
754 let msg_req = conn.pending_requests.remove(&sid)?;
755 let msg_resp = conn.pending_responses.remove(&sid)?;
756
757 conn.h2_emitted_requests.remove(&sid);
759 conn.h2_emitted_responses.remove(&sid);
760 conn.ready_streams.remove(&sid);
761
762 let request_complete_time = msg_req.end_stream_timestamp_ns;
766 let response_start_time = msg_resp.first_frame_timestamp_ns;
767
768 let req = msg_req.into_http_request()?;
769 let resp = msg_resp.into_http_response()?;
770
771 let latency = response_start_time.saturating_sub(request_complete_time);
772 (req, resp, Some(sid), latency)
773 },
774 Protocol::Unknown => return None,
775 };
776
777 Some(Exchange {
778 request,
779 response,
780 latency_ns,
781 protocol: conn.protocol,
782 process_id: conn.process_id,
783 remote_port: conn.remote_port,
784 stream_id,
785 })
786}
787
788#[cfg(test)]
789mod tests;