1use crate::memdx::extframe::decode_res_ext_frames;
20use crate::memdx::packet::ResponsePacket;
21use crate::options::orphan_reporter::OrphanReporterConfig;
22use serde_json::json;
23use std::cmp::Reverse;
24use std::collections::BinaryHeap;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter, Result as FmtResult};
27use std::mem;
28use std::net::SocketAddr;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use tokio::time::{interval_at, Instant, MissedTickBehavior};
33use tracing::{trace, warn};
34
35#[derive(Debug, Clone)]
36pub struct OrphanContext {
37 pub client_id: String,
38 pub local_addr: SocketAddr,
39 pub peer_addr: SocketAddr,
40}
41
42#[derive(Debug, Eq)]
43struct OrphanLogItem {
44 pub connection_id: String,
45 pub operation_id: String,
46 pub remote_socket: String,
47 pub local_socket: String,
48 pub server_duration: Duration,
49 pub total_server_duration: Duration,
50 pub operation_name: String,
51}
52
53impl Display for OrphanLogItem {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 let obj = json!({
56 "last_server_duration_us": self.server_duration.as_micros() as u64,
57 "total_server_duration_us": self.total_server_duration.as_micros() as u64,
58 "operation_name": self.operation_name,
59 "last_local_id": self.connection_id,
60 "operation_id": self.operation_id,
61 "last_local_socket": self.local_socket,
62 "last_remote_socket": self.remote_socket,
63 });
64 write!(f, "{}", obj)
65 }
66}
67
68struct OrphanLogJsonEntry {
69 count: u64,
70 top_items: Vec<OrphanLogItem>,
71}
72
73impl Display for OrphanLogJsonEntry {
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 let bodies: Vec<String> = self.top_items.iter().map(|item| item.to_string()).collect();
76 write!(
77 f,
78 r#"{{"total_count":{},"top_requests":[{}]}}"#,
79 self.count,
80 bodies.join(",")
81 )
82 }
83}
84
85struct OrphanLogService(HashMap<String, OrphanLogJsonEntry>);
86
87impl Display for OrphanLogService {
88 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89 let mut parts = Vec::new();
90 for (svc, entry) in &self.0 {
91 parts.push(format!(r#""{}":{}"#, svc, entry));
92 }
93 write!(f, "{{{}}}", parts.join(","))
94 }
95}
96
97impl PartialEq for OrphanLogItem {
99 fn eq(&self, other: &Self) -> bool {
100 self.total_server_duration == other.total_server_duration
101 }
102}
103
104impl Ord for OrphanLogItem {
105 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
106 self.total_server_duration.cmp(&other.total_server_duration)
107 }
108}
109
110impl PartialOrd for OrphanLogItem {
111 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116pub struct OrphanReporter {
118 total_count: Arc<AtomicU64>,
119 heap: Arc<RwLock<BinaryHeap<Reverse<OrphanLogItem>>>>,
120 sample_size: usize,
121 reporter_interval: Duration,
122}
123
124impl OrphanReporter {
125 pub fn new(config: OrphanReporterConfig) -> Self {
126 let heap = Arc::new(RwLock::new(BinaryHeap::with_capacity(config.sample_size)));
127 let total_count = Arc::new(AtomicU64::new(0));
128
129 let heap_clone = heap.clone();
130 let total_count_clone = total_count.clone();
131
132 tokio::spawn(async move {
133 trace!(
134 "OrphanReporter started: reporter_interval={:?}, sample_size={}",
135 config.reporter_interval,
136 config.sample_size
137 );
138 let start = Instant::now() + config.reporter_interval;
139 let mut tick = interval_at(start, config.reporter_interval);
140 tick.set_missed_tick_behavior(MissedTickBehavior::Burst);
141
142 loop {
143 tokio::select! {
144 _ = tick.tick() => {
145 let count = total_count_clone.swap(0, Ordering::Relaxed);
146 if count == 0 {
147 continue;
148 }
149 let mut write_guard = heap_clone.write().unwrap();
150 let obj = Self::create_log_object("kv".to_string(), mem::take(&mut write_guard), count);
151 warn!("Orphaned responses observed: {}", obj);
152 }
153 }
154 }
155 });
156 Self {
157 total_count,
158 heap,
159 sample_size: config.sample_size,
160 reporter_interval: config.reporter_interval,
161 }
162 }
163
164 pub fn get_handle(&self) -> Arc<dyn Fn(ResponsePacket, OrphanContext) + Send + Sync> {
165 let heap = self.heap.clone();
166 let total_count = self.total_count.clone();
167 let sample_size = self.sample_size;
168
169 Arc::new(move |msg: ResponsePacket, ctx: OrphanContext| {
170 total_count.fetch_add(1, Ordering::Relaxed);
171
172 let server_dur = msg
173 .framing_extras
174 .as_deref()
175 .and_then(|f| decode_res_ext_frames(f).ok().flatten())
176 .unwrap_or_default();
177
178 let (current_length, current_min) = {
180 let guard = heap.read().unwrap_or_else(|p| {
181 warn!("OrphanReporter heap poisoned; continuing");
182 p.into_inner()
183 });
184 (
185 guard.len(),
186 guard.peek().map(|Reverse(i)| i.total_server_duration),
187 )
188 };
189
190 let needs_write = current_length < sample_size
191 || current_min.map(|m| server_dur > m).unwrap_or(false);
192
193 if needs_write {
194 let mut write_guard = heap.write().unwrap_or_else(|p| {
195 warn!("OrphanReporter heap poisoned; continuing");
196 p.into_inner()
197 });
198
199 if write_guard.len() < sample_size {
200 write_guard.push(Reverse(OrphanLogItem {
201 connection_id: ctx.client_id,
202 operation_id: format!("0x{:x}", msg.opaque),
203 remote_socket: ctx.peer_addr.to_string(),
204 local_socket: ctx.local_addr.to_string(),
205 server_duration: server_dur,
206 total_server_duration: server_dur,
207 operation_name: format!("{:?}", msg.op_code),
208 }));
209 } else if let Some(Reverse(min)) = write_guard.peek() {
210 if server_dur > min.total_server_duration {
211 write_guard.pop();
212 write_guard.push(Reverse(OrphanLogItem {
213 connection_id: ctx.client_id,
214 operation_id: format!("0x{:x}", msg.opaque),
215 remote_socket: ctx.peer_addr.to_string(),
216 local_socket: ctx.local_addr.to_string(),
217 server_duration: server_dur,
218 total_server_duration: server_dur,
219 operation_name: format!("{:?}", msg.op_code),
220 }));
221 }
222 }
223 }
224 })
225 }
226
227 fn create_log_object(
228 service: String,
229 heap_items: BinaryHeap<Reverse<OrphanLogItem>>,
230 count: u64,
231 ) -> OrphanLogService {
232 let items: Vec<OrphanLogItem> = heap_items
233 .into_sorted_vec()
234 .into_iter()
235 .map(|Reverse(item)| item)
236 .collect();
237 let entry = OrphanLogJsonEntry {
238 count,
239 top_items: items,
240 };
241 let mut services = HashMap::new();
242 services.insert(service, entry);
243 OrphanLogService(services)
244 }
245}