Skip to main content

couchbase_core/
orphan_reporter.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::memdx::extframe::decode_res_ext_frames;
20use crate::memdx::packet::ResponsePacket;
21use crate::options::orphan_reporter::OrphanReporterConfig;
22use serde_json::json;
23use std::cmp::Reverse;
24use std::collections::BinaryHeap;
25use std::collections::HashMap;
26use std::fmt::{Display, Formatter, Result as FmtResult};
27use std::mem;
28use std::net::SocketAddr;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use tokio::time::{interval_at, Instant, MissedTickBehavior};
33use tracing::{trace, warn};
34
35#[derive(Debug, Clone)]
36pub struct OrphanContext {
37    pub client_id: String,
38    pub local_addr: SocketAddr,
39    pub peer_addr: SocketAddr,
40}
41
42#[derive(Debug, Eq)]
43struct OrphanLogItem {
44    pub connection_id: String,
45    pub operation_id: String,
46    pub remote_socket: String,
47    pub local_socket: String,
48    pub server_duration: Duration,
49    pub total_server_duration: Duration,
50    pub operation_name: String,
51}
52
53impl Display for OrphanLogItem {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        let obj = json!({
56            "last_server_duration_us": self.server_duration.as_micros() as u64,
57            "total_server_duration_us": self.total_server_duration.as_micros() as u64,
58            "operation_name": self.operation_name,
59            "last_local_id": self.connection_id,
60            "operation_id": self.operation_id,
61            "last_local_socket": self.local_socket,
62            "last_remote_socket": self.remote_socket,
63        });
64        write!(f, "{}", obj)
65    }
66}
67
68struct OrphanLogJsonEntry {
69    count: u64,
70    top_items: Vec<OrphanLogItem>,
71}
72
73impl Display for OrphanLogJsonEntry {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        let bodies: Vec<String> = self.top_items.iter().map(|item| item.to_string()).collect();
76        write!(
77            f,
78            r#"{{"total_count":{},"top_requests":[{}]}}"#,
79            self.count,
80            bodies.join(",")
81        )
82    }
83}
84
85struct OrphanLogService(HashMap<String, OrphanLogJsonEntry>);
86
87impl Display for OrphanLogService {
88    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89        let mut parts = Vec::new();
90        for (svc, entry) in &self.0 {
91            parts.push(format!(r#""{}":{}"#, svc, entry));
92        }
93        write!(f, "{{{}}}", parts.join(","))
94    }
95}
96
97// Once we have total_duration added we will order on that
98impl PartialEq for OrphanLogItem {
99    fn eq(&self, other: &Self) -> bool {
100        self.total_server_duration == other.total_server_duration
101    }
102}
103
104impl Ord for OrphanLogItem {
105    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
106        self.total_server_duration.cmp(&other.total_server_duration)
107    }
108}
109
110impl PartialOrd for OrphanLogItem {
111    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112        Some(self.cmp(other))
113    }
114}
115
116// Orphan reporter - Currently only handles/receives KV orphans
117pub struct OrphanReporter {
118    total_count: Arc<AtomicU64>,
119    heap: Arc<RwLock<BinaryHeap<Reverse<OrphanLogItem>>>>,
120    sample_size: usize,
121    reporter_interval: Duration,
122}
123
124impl OrphanReporter {
125    pub fn new(config: OrphanReporterConfig) -> Self {
126        let heap = Arc::new(RwLock::new(BinaryHeap::with_capacity(config.sample_size)));
127        let total_count = Arc::new(AtomicU64::new(0));
128
129        let heap_clone = heap.clone();
130        let total_count_clone = total_count.clone();
131
132        tokio::spawn(async move {
133            trace!(
134                "OrphanReporter started: reporter_interval={:?}, sample_size={}",
135                config.reporter_interval,
136                config.sample_size
137            );
138            let start = Instant::now() + config.reporter_interval;
139            let mut tick = interval_at(start, config.reporter_interval);
140            tick.set_missed_tick_behavior(MissedTickBehavior::Burst);
141
142            loop {
143                tokio::select! {
144                    _ = tick.tick() => {
145                        let count = total_count_clone.swap(0, Ordering::Relaxed);
146                        if count == 0 {
147                            continue;
148                        }
149                        let mut write_guard = heap_clone.write().unwrap();
150                        let obj = Self::create_log_object("kv".to_string(), mem::take(&mut write_guard), count);
151                        warn!("Orphaned responses observed: {}", obj);
152                    }
153                }
154            }
155        });
156        Self {
157            total_count,
158            heap,
159            sample_size: config.sample_size,
160            reporter_interval: config.reporter_interval,
161        }
162    }
163
164    pub fn get_handle(&self) -> Arc<dyn Fn(ResponsePacket, OrphanContext) + Send + Sync> {
165        let heap = self.heap.clone();
166        let total_count = self.total_count.clone();
167        let sample_size = self.sample_size;
168
169        Arc::new(move |msg: ResponsePacket, ctx: OrphanContext| {
170            total_count.fetch_add(1, Ordering::Relaxed);
171
172            let server_dur = msg
173                .framing_extras
174                .as_deref()
175                .and_then(|f| decode_res_ext_frames(f).ok().flatten())
176                .unwrap_or_default();
177
178            // Read-only
179            let (current_length, current_min) = {
180                let guard = heap.read().unwrap_or_else(|p| {
181                    warn!("OrphanReporter heap poisoned; continuing");
182                    p.into_inner()
183                });
184                (
185                    guard.len(),
186                    guard.peek().map(|Reverse(i)| i.total_server_duration),
187                )
188            };
189
190            let needs_write = current_length < sample_size
191                || current_min.map(|m| server_dur > m).unwrap_or(false);
192
193            if needs_write {
194                let mut write_guard = heap.write().unwrap_or_else(|p| {
195                    warn!("OrphanReporter heap poisoned; continuing");
196                    p.into_inner()
197                });
198
199                if write_guard.len() < sample_size {
200                    write_guard.push(Reverse(OrphanLogItem {
201                        connection_id: ctx.client_id,
202                        operation_id: format!("0x{:x}", msg.opaque),
203                        remote_socket: ctx.peer_addr.to_string(),
204                        local_socket: ctx.local_addr.to_string(),
205                        server_duration: server_dur,
206                        total_server_duration: server_dur,
207                        operation_name: format!("{:?}", msg.op_code),
208                    }));
209                } else if let Some(Reverse(min)) = write_guard.peek() {
210                    if server_dur > min.total_server_duration {
211                        write_guard.pop();
212                        write_guard.push(Reverse(OrphanLogItem {
213                            connection_id: ctx.client_id,
214                            operation_id: format!("0x{:x}", msg.opaque),
215                            remote_socket: ctx.peer_addr.to_string(),
216                            local_socket: ctx.local_addr.to_string(),
217                            server_duration: server_dur,
218                            total_server_duration: server_dur,
219                            operation_name: format!("{:?}", msg.op_code),
220                        }));
221                    }
222                }
223            }
224        })
225    }
226
227    fn create_log_object(
228        service: String,
229        heap_items: BinaryHeap<Reverse<OrphanLogItem>>,
230        count: u64,
231    ) -> OrphanLogService {
232        let items: Vec<OrphanLogItem> = heap_items
233            .into_sorted_vec()
234            .into_iter()
235            .map(|Reverse(item)| item)
236            .collect();
237        let entry = OrphanLogJsonEntry {
238            count,
239            top_items: items,
240        };
241        let mut services = HashMap::new();
242        services.insert(service, entry);
243        OrphanLogService(services)
244    }
245}