1#![warn(missing_docs)]
2mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61 H2ConnectionState,
62 StreamId,
63 TimestampNs,
64 is_http2_preface,
65 looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69pub const MAX_BUF_SIZE: usize = 16384;
71
72pub struct Collator<E: DataEvent> {
78 connections: DashMap<u128, Conn>,
80 ssl_connections: DashMap<u32, Conn>,
82 config: CollatorConfig,
84 _phantom: PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl<E: DataEvent> Collator<E> {
95 pub fn new() -> Self {
98 Self::with_config(CollatorConfig::default())
99 }
100
101 pub fn with_config(config: CollatorConfig) -> Self {
103 Self {
104 connections: DashMap::new(),
105 ssl_connections: DashMap::new(),
106 config,
107 _phantom: PhantomData,
108 }
109 }
110
111 pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113 Self::with_config(CollatorConfig {
114 max_buf_size,
115 ..Default::default()
116 })
117 }
118
119 pub fn config(&self) -> &CollatorConfig {
121 &self.config
122 }
123
124 pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134 let direction = event.direction();
136 let timestamp_ns = TimestampNs(event.timestamp_ns());
137 let conn_id = event.connection_id();
138 let process_id = event.process_id();
139 let remote_port = event.remote_port();
140 let is_empty = event.payload().is_empty();
141
142 if is_empty {
143 return Vec::new();
144 }
145
146 if direction == Direction::Other {
148 return Vec::new();
149 }
150
151 let data = event.into_payload();
154
155 let chunk = DataChunk {
156 data,
157 timestamp_ns,
158 direction,
159 };
160
161 if conn_id != 0 {
164 let mut conn = self
165 .connections
166 .entry(conn_id)
167 .or_insert_with(|| Conn::new(process_id, remote_port));
168 Self::process_event_for_conn(
169 &mut conn,
170 chunk,
171 direction,
172 timestamp_ns,
173 remote_port,
174 conn_id,
175 process_id,
176 &self.config,
177 )
178 } else {
179 let mut conn = self
180 .ssl_connections
181 .entry(process_id)
182 .or_insert_with(|| Conn::new(process_id, remote_port));
183 Self::process_event_for_conn(
184 &mut conn,
185 chunk,
186 direction,
187 timestamp_ns,
188 remote_port,
189 conn_id,
190 process_id,
191 &self.config,
192 )
193 }
194 }
195
196 #[allow(clippy::too_many_arguments)]
199 fn process_event_for_conn(
200 conn: &mut Conn,
201 chunk: DataChunk,
202 direction: Direction,
203 timestamp_ns: TimestampNs,
204 remote_port: u16,
205 conn_id: u128,
206 process_id: u32,
207 config: &CollatorConfig,
208 ) -> Vec<CollationEvent> {
209 let buf: &[u8] = &chunk.data;
210
211 conn.last_activity_ns = timestamp_ns;
212 if remote_port != 0 && conn.remote_port.is_none() {
213 conn.remote_port = Some(remote_port);
214 }
215
216 if conn.protocol == Protocol::Unknown {
218 conn.protocol = detect_protocol(buf);
219 }
220
221 if conn.protocol != Protocol::Unknown {
223 let incoming_protocol = detect_protocol(buf);
224 if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225 reset_connection_for_protocol_change(conn, incoming_protocol);
226 }
227 }
228
229 let mut events = Vec::new();
230
231 match direction {
233 Direction::Write => {
234 conn.request_body_size += buf.len();
235 if conn.request_body_size > config.max_body_size {
236 reset_connection_body_limit(conn);
237 return Vec::new();
238 }
239
240 if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
242 conn.h1_write_buffer.extend_from_slice(buf);
243 }
244 conn.request_chunks.push(chunk);
245
246 match conn.protocol {
247 Protocol::Http1 => {
248 drain_parse_emit_http1_write(
249 conn,
250 conn_id,
251 process_id,
252 config,
253 &mut events,
254 );
255 },
256 Protocol::Http2 => {
257 parse_http2_chunks(conn, direction);
258 },
259 Protocol::Unknown => {
262 drain_parse_emit_http1_unknown_write(
263 conn,
264 conn_id,
265 process_id,
266 config,
267 &mut events,
268 );
269 },
270 }
271
272 if is_request_complete(conn) {
275 conn.request_complete = true;
276 }
277 if is_response_complete(conn) {
278 conn.response_complete = true;
279 }
280 },
281 Direction::Read => {
282 conn.response_body_size += buf.len();
283 if conn.response_body_size > config.max_body_size {
284 reset_connection_body_limit(conn);
285 return Vec::new();
286 }
287
288 if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
290 conn.h1_read_buffer.extend_from_slice(buf);
291 }
292 conn.response_chunks.push(chunk);
293
294 match conn.protocol {
295 Protocol::Http1 => {
296 drain_parse_emit_http1_read(
297 conn,
298 conn_id,
299 process_id,
300 config,
301 &mut events,
302 );
303 },
304 Protocol::Http2 => {
305 parse_http2_chunks(conn, direction);
306 },
307 Protocol::Unknown => {
310 drain_parse_emit_http1_unknown_read(
311 conn,
312 conn_id,
313 process_id,
314 config,
315 &mut events,
316 );
317 },
318 }
319
320 if is_request_complete(conn) {
323 conn.request_complete = true;
324 }
325 if is_response_complete(conn) {
326 conn.response_complete = true;
327 }
328 },
329 Direction::Other => {
330 return Vec::new();
331 },
332 }
333
334 if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
336 conn.request_complete = true;
337 conn.response_complete = true;
338 }
339
340 if config.emit_messages {
346 emit_message_events(conn, conn_id, process_id, &mut events);
347 }
348
349 if config.emit_exchanges && conn.request_complete && conn.response_complete {
350 if let Some(exchange) = build_exchange(conn) {
351 events.push(CollationEvent::Exchange(exchange));
352 }
353 reset_connection_after_exchange(conn);
354 }
355
356 events
357 }
358
359 pub fn cleanup(&self, current_time_ns: TimestampNs) {
364 self.connections.retain(|_, conn| {
365 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
366 });
367 self.ssl_connections.retain(|_, conn| {
368 current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
369 });
370
371 for mut entry in self.connections.iter_mut() {
373 entry.h2_write_state.evict_stale_streams(current_time_ns);
374 entry.h2_read_state.evict_stale_streams(current_time_ns);
375 }
376 for mut entry in self.ssl_connections.iter_mut() {
377 entry.h2_write_state.evict_stale_streams(current_time_ns);
378 entry.h2_read_state.evict_stale_streams(current_time_ns);
379 }
380 }
381
382 pub fn remove_connection(&self, connection_id: u128, process_id: u32) {
387 if connection_id != 0 {
388 self.connections.remove(&connection_id);
389 } else {
390 self.ssl_connections.remove(&process_id);
391 }
392 }
393
394 pub fn close_connection(&self, connection_id: u128, process_id: u32) -> Vec<CollationEvent> {
402 let events = if connection_id != 0 {
403 match self.connections.get_mut(&connection_id) {
404 Some(mut guard) => {
405 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
406 },
407 None => Vec::new(),
408 }
409 } else {
410 match self.ssl_connections.get_mut(&process_id) {
411 Some(mut guard) => {
412 finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
413 },
414 None => Vec::new(),
415 }
416 };
417
418 if connection_id != 0 {
420 self.connections.remove(&connection_id);
421 } else {
422 self.ssl_connections.remove(&process_id);
423 }
424
425 events
426 }
427}
428
429fn finalize_and_emit(
431 conn: &mut Conn,
432 connection_id: u128,
433 process_id: u32,
434 config: &CollatorConfig,
435) -> Vec<CollationEvent> {
436 if conn.protocol == Protocol::Http1
438 && conn.h1_response.is_none()
439 && !conn.h1_read_buffer.is_empty()
440 {
441 let timestamp = conn
442 .response_chunks
443 .first()
444 .map(|c| c.timestamp_ns)
445 .unwrap_or(TimestampNs(0));
446 conn.h1_response = h1::try_finalize_http1_response(&conn.h1_read_buffer, timestamp);
447 if conn.h1_response.is_some() {
448 conn.response_complete = true;
449 }
450 }
451
452 let mut events = Vec::new();
453
454 if config.emit_messages {
455 emit_message_events(conn, connection_id, process_id, &mut events);
456 }
457
458 if config.emit_exchanges
459 && conn.request_complete
460 && conn.response_complete
461 && let Some(exchange) = build_exchange(conn)
462 {
463 events.push(CollationEvent::Exchange(exchange));
464 }
465
466 events
467}
468
469fn emit_message_events(
472 conn: &mut Conn,
473 conn_id: u128,
474 process_id: u32,
475 events: &mut Vec<CollationEvent>,
476) {
477 match conn.protocol {
478 Protocol::Http1 => {
479 if let Some(ref req) = conn.h1_request
481 && !conn.h1_request_emitted
482 {
483 let metadata = MessageMetadata {
484 connection_id: conn_id,
485 process_id,
486 timestamp_ns: req.timestamp_ns,
487 stream_id: None,
488 remote_port: conn.remote_port,
489 protocol: conn.protocol,
490 };
491 events.push(CollationEvent::Message {
492 message: ParsedHttpMessage::Request(req.clone()),
493 metadata,
494 });
495 conn.h1_request_emitted = true;
496 }
497
498 if let Some(ref resp) = conn.h1_response
500 && !conn.h1_response_emitted
501 {
502 let metadata = MessageMetadata {
503 connection_id: conn_id,
504 process_id,
505 timestamp_ns: resp.timestamp_ns,
506 stream_id: None,
507 remote_port: conn.remote_port,
508 protocol: conn.protocol,
509 };
510 events.push(CollationEvent::Message {
511 message: ParsedHttpMessage::Response(resp.clone()),
512 metadata,
513 });
514 conn.h1_response_emitted = true;
515 }
516 },
517 Protocol::Http2 => {
518 for (&stream_id, msg) in &conn.pending_requests {
520 if !conn.h2_emitted_requests.contains(&stream_id)
521 && let Some(req) = msg.to_http_request()
522 {
523 let metadata = MessageMetadata {
524 connection_id: conn_id,
525 process_id,
526 timestamp_ns: msg.end_stream_timestamp_ns,
527 stream_id: Some(stream_id),
528 remote_port: conn.remote_port,
529 protocol: conn.protocol,
530 };
531 events.push(CollationEvent::Message {
532 message: ParsedHttpMessage::Request(req),
533 metadata,
534 });
535 }
536 }
537 conn.h2_emitted_requests
539 .extend(conn.pending_requests.keys().copied());
540
541 for (&stream_id, msg) in &conn.pending_responses {
543 if !conn.h2_emitted_responses.contains(&stream_id)
544 && let Some(resp) = msg.to_http_response()
545 {
546 let metadata = MessageMetadata {
547 connection_id: conn_id,
548 process_id,
549 timestamp_ns: msg.first_frame_timestamp_ns,
550 stream_id: Some(stream_id),
551 remote_port: conn.remote_port,
552 protocol: conn.protocol,
553 };
554 events.push(CollationEvent::Message {
555 message: ParsedHttpMessage::Response(resp),
556 metadata,
557 });
558 }
559 }
560 conn.h2_emitted_responses
562 .extend(conn.pending_responses.keys().copied());
563 },
564 Protocol::Unknown => {},
565 }
566}
567
568fn reset_connection_after_exchange(conn: &mut Conn) {
570 conn.request_complete = false;
571 conn.response_complete = false;
572
573 if conn.protocol == Protocol::Http1 {
574 conn.request_chunks.clear();
576 conn.response_chunks.clear();
577 conn.h1_request = None;
578 conn.h1_response = None;
579 conn.h1_write_parsed = false;
580 conn.h1_read_parsed = false;
581 conn.h1_request_emitted = false;
582 conn.h1_response_emitted = false;
583 conn.h1_write_buffer.clear();
584 conn.h1_read_buffer.clear();
585 conn.protocol = Protocol::Unknown;
586 } else if conn.protocol == Protocol::Http2 {
587 if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
593 conn.request_chunks.clear();
594 conn.response_chunks.clear();
595 conn.h2_write_state.clear_buffer();
596 conn.h2_read_state.clear_buffer();
597 }
598 }
599
600 conn.request_body_size = 0;
602 conn.response_body_size = 0;
603}
604
605fn reset_connection_body_limit(conn: &mut Conn) {
608 conn.request_chunks.clear();
609 conn.response_chunks.clear();
610 conn.h1_write_buffer.clear();
611 conn.h1_read_buffer.clear();
612 conn.h1_request = None;
613 conn.h1_response = None;
614 conn.h1_write_parsed = false;
615 conn.h1_read_parsed = false;
616 conn.h1_request_emitted = false;
617 conn.h1_response_emitted = false;
618 conn.h2_write_state = H2ConnectionState::new();
619 conn.h2_read_state = H2ConnectionState::new();
620 conn.pending_requests.clear();
621 conn.pending_responses.clear();
622 conn.h2_emitted_requests.clear();
623 conn.h2_emitted_responses.clear();
624 conn.ready_streams.clear();
625 conn.request_complete = false;
626 conn.response_complete = false;
627 conn.request_body_size = 0;
628 conn.response_body_size = 0;
629 conn.protocol = Protocol::Unknown;
630}
631
632fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
635 conn.request_chunks.clear();
636 conn.response_chunks.clear();
637 conn.h1_write_buffer.clear();
638 conn.h1_read_buffer.clear();
639 conn.h1_request = None;
640 conn.h1_response = None;
641 conn.h1_write_parsed = false;
642 conn.h1_read_parsed = false;
643 conn.h1_request_emitted = false;
644 conn.h1_response_emitted = false;
645 conn.h2_write_state = H2ConnectionState::new();
646 conn.h2_read_state = H2ConnectionState::new();
647 conn.pending_requests.clear();
648 conn.pending_responses.clear();
649 conn.h2_emitted_requests.clear();
650 conn.h2_emitted_responses.clear();
651 conn.ready_streams.clear();
652 conn.request_complete = false;
653 conn.response_complete = false;
654 conn.request_body_size = 0;
655 conn.response_body_size = 0;
656 conn.protocol = new_protocol;
657}
658
659pub fn detect_protocol(data: &[u8]) -> Protocol {
665 if is_http2_preface(data) {
667 return Protocol::Http2;
668 }
669
670 if looks_like_http2_frame(data) {
672 return Protocol::Http2;
673 }
674
675 if h1::is_http1_request(data) || h1::is_http1_response(data) {
677 return Protocol::Http1;
678 }
679
680 Protocol::Unknown
681}
682
683fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
692 let last_chunk_is_preface = match direction {
697 Direction::Write => conn
698 .request_chunks
699 .last()
700 .is_some_and(|c| is_http2_preface(&c.data)),
701 Direction::Read => conn
702 .response_chunks
703 .last()
704 .is_some_and(|c| is_http2_preface(&c.data)),
705 Direction::Other => false,
706 };
707 let current_state_has_preface = match direction {
708 Direction::Write => conn.h2_write_state.preface_received,
709 Direction::Read => conn.h2_read_state.preface_received,
710 Direction::Other => false,
711 };
712
713 if last_chunk_is_preface && current_state_has_preface {
714 conn.h2_write_state = H2ConnectionState::new();
715 conn.h2_read_state = H2ConnectionState::new();
716 conn.pending_requests.clear();
717 conn.pending_responses.clear();
718 conn.h2_emitted_requests.clear();
719 conn.h2_emitted_responses.clear();
720 conn.ready_streams.clear();
721 }
722
723 let (chunks, h2_state) = match direction {
724 Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
725 Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
726 Direction::Other => return,
727 };
728
729 let chunk = match chunks.last() {
730 Some(c) => c,
731 None => return,
732 };
733
734 let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
736
737 while let Some((stream_id, msg)) = h2_state.try_pop() {
740 if msg.is_request() {
741 conn.pending_requests.insert(stream_id, msg);
742 if conn.pending_responses.contains_key(&stream_id) {
743 conn.ready_streams.insert(stream_id);
744 }
745 } else if msg.is_response() {
746 conn.pending_responses.insert(stream_id, msg);
747 if conn.pending_requests.contains_key(&stream_id) {
748 conn.ready_streams.insert(stream_id);
749 }
750 }
751 }
752}
753
754fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
757 conn.ready_streams.iter().next().copied()
758}
759
760fn drain_parse_emit_http1_write(
769 conn: &mut Conn,
770 conn_id: u128,
771 process_id: u32,
772 config: &CollatorConfig,
773 events: &mut Vec<CollationEvent>,
774) {
775 loop {
776 let timestamp = conn
777 .request_chunks
778 .last()
779 .map(|c| c.timestamp_ns)
780 .unwrap_or(TimestampNs(0));
781
782 if let Some((req, consumed)) =
784 h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
785 {
786 conn.h1_write_buffer.drain(..consumed);
787 emit_h1_request(conn, conn_id, process_id, config, events, req);
788 } else if let Some((resp, consumed)) =
789 h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
790 {
791 conn.h1_write_buffer.drain(..consumed);
792 emit_h1_response(conn, conn_id, process_id, config, events, resp);
793 } else {
794 break;
795 }
796 }
797}
798
799fn drain_parse_emit_http1_read(
802 conn: &mut Conn,
803 conn_id: u128,
804 process_id: u32,
805 config: &CollatorConfig,
806 events: &mut Vec<CollationEvent>,
807) {
808 loop {
809 let timestamp = conn
810 .response_chunks
811 .last()
812 .map(|c| c.timestamp_ns)
813 .unwrap_or(TimestampNs(0));
814
815 if let Some((resp, consumed)) =
817 h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
818 {
819 conn.h1_read_buffer.drain(..consumed);
820 emit_h1_response(conn, conn_id, process_id, config, events, resp);
821 } else if let Some((req, consumed)) =
822 h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
823 {
824 conn.h1_read_buffer.drain(..consumed);
825 emit_h1_request(conn, conn_id, process_id, config, events, req);
826 } else {
827 break;
828 }
829 }
830}
831
832fn drain_parse_emit_http1_unknown_write(
835 conn: &mut Conn,
836 conn_id: u128,
837 process_id: u32,
838 config: &CollatorConfig,
839 events: &mut Vec<CollationEvent>,
840) {
841 let timestamp = conn
842 .request_chunks
843 .last()
844 .map(|c| c.timestamp_ns)
845 .unwrap_or(TimestampNs(0));
846 if let Some((req, consumed)) =
847 h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
848 {
849 conn.protocol = Protocol::Http1;
850 conn.h1_write_buffer.drain(..consumed);
851 emit_h1_request(conn, conn_id, process_id, config, events, req);
852 drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
853 } else if let Some((resp, consumed)) =
854 h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
855 {
856 conn.protocol = Protocol::Http1;
857 conn.h1_write_buffer.drain(..consumed);
858 emit_h1_response(conn, conn_id, process_id, config, events, resp);
859 drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
860 }
861}
862
863fn drain_parse_emit_http1_unknown_read(
866 conn: &mut Conn,
867 conn_id: u128,
868 process_id: u32,
869 config: &CollatorConfig,
870 events: &mut Vec<CollationEvent>,
871) {
872 let timestamp = conn
873 .response_chunks
874 .last()
875 .map(|c| c.timestamp_ns)
876 .unwrap_or(TimestampNs(0));
877 if let Some((resp, consumed)) =
878 h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
879 {
880 conn.protocol = Protocol::Http1;
881 conn.h1_read_buffer.drain(..consumed);
882 emit_h1_response(conn, conn_id, process_id, config, events, resp);
883 drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
884 } else if let Some((req, consumed)) =
885 h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
886 {
887 conn.protocol = Protocol::Http1;
888 conn.h1_read_buffer.drain(..consumed);
889 emit_h1_request(conn, conn_id, process_id, config, events, req);
890 drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
891 }
892}
893
894fn emit_h1_request(
898 conn: &mut Conn,
899 conn_id: u128,
900 process_id: u32,
901 config: &CollatorConfig,
902 events: &mut Vec<CollationEvent>,
903 req: h1::HttpRequest,
904) {
905 if config.emit_messages {
906 let metadata = MessageMetadata {
907 connection_id: conn_id,
908 process_id,
909 timestamp_ns: req.timestamp_ns,
910 stream_id: None,
911 remote_port: conn.remote_port,
912 protocol: Protocol::Http1,
913 };
914 events.push(CollationEvent::Message {
915 message: ParsedHttpMessage::Request(req.clone()),
916 metadata,
917 });
918 }
919 conn.h1_request = Some(req);
920 conn.h1_request_emitted = true;
921}
922
923fn emit_h1_response(
927 conn: &mut Conn,
928 conn_id: u128,
929 process_id: u32,
930 config: &CollatorConfig,
931 events: &mut Vec<CollationEvent>,
932 resp: h1::HttpResponse,
933) {
934 if config.emit_messages {
935 let metadata = MessageMetadata {
936 connection_id: conn_id,
937 process_id,
938 timestamp_ns: resp.timestamp_ns,
939 stream_id: None,
940 remote_port: conn.remote_port,
941 protocol: Protocol::Http1,
942 };
943 events.push(CollationEvent::Message {
944 message: ParsedHttpMessage::Response(resp.clone()),
945 metadata,
946 });
947 }
948 conn.h1_response = Some(resp);
949 conn.h1_response_emitted = true;
950}
951
952fn is_request_complete(conn: &Conn) -> bool {
953 match conn.protocol {
954 Protocol::Http1 => conn.h1_request.is_some(),
955 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
956 Protocol::Unknown => false,
957 }
958}
959
960fn is_response_complete(conn: &Conn) -> bool {
961 match conn.protocol {
962 Protocol::Http1 => conn.h1_response.is_some(),
963 Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
964 Protocol::Unknown => false,
965 }
966}
967
968fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
969 let (request, response, stream_id, latency_ns) = match conn.protocol {
970 Protocol::Http1 => {
971 let req = conn.h1_request.take()?;
973 let resp = conn.h1_response.take()?;
974 let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
975 (req, resp, None, latency)
976 },
977 Protocol::Http2 => {
978 let sid = find_complete_h2_stream(conn)?;
979 let msg_req = conn.pending_requests.remove(&sid)?;
980 let msg_resp = conn.pending_responses.remove(&sid)?;
981
982 conn.h2_emitted_requests.remove(&sid);
984 conn.h2_emitted_responses.remove(&sid);
985 conn.ready_streams.remove(&sid);
986
987 let request_complete_time = msg_req.end_stream_timestamp_ns;
991 let response_start_time = msg_resp.first_frame_timestamp_ns;
992
993 let req = msg_req.into_http_request()?;
994 let resp = msg_resp.into_http_response()?;
995
996 let latency = response_start_time.saturating_sub(request_complete_time);
997 (req, resp, Some(sid), latency)
998 },
999 Protocol::Unknown => return None,
1000 };
1001
1002 Some(Exchange {
1003 request,
1004 response,
1005 latency_ns,
1006 protocol: conn.protocol,
1007 process_id: conn.process_id,
1008 remote_port: conn.remote_port,
1009 stream_id,
1010 })
1011}
1012
1013#[cfg(test)]
1014mod tests;