Skip to main content

lean_ctx/core/
telemetry_queue.rs

1//! In-process async telemetry queue for the MCP server process.
2//!
3//! MCP tool calls enqueue events without blocking; a single background Tokio
4//! task drains the channel and POSTs each event to the cloud server on a
5//! threadpool thread.
6//!
7//! Shell hooks run as short-lived separate processes that cannot share the
8//! in-process channel. They use [`fire_sync`], which caps overhead at 300 ms
9//! via a detached thread and a receive-timeout, so the user's shell command
10//! never stalls perceptibly even when the server is unreachable.
11
12use std::sync::OnceLock;
13use std::time::Duration;
14
15use tokio::sync::mpsc::{self, UnboundedSender};
16
17use crate::models::TelemetryIngestRequest;
18
19static TX: OnceLock<UnboundedSender<TelemetryIngestRequest>> = OnceLock::new();
20
21/// Enqueue a telemetry event for background delivery.
22///
23/// Returns immediately — no network I/O on the calling thread.
24/// Events enqueued before [`start_drain_task`] is called are silently dropped.
25pub fn enqueue(request: TelemetryIngestRequest) {
26    if let Some(tx) = TX.get() {
27        // UnboundedSender::send only errors when the receiver is dropped,
28        // which cannot happen while the drain task is running.
29        let _ = tx.send(request);
30    }
31}
32
33/// Spawn the background drain task inside the running Tokio runtime.
34///
35/// Must be called once at MCP server startup. Subsequent calls are no-ops;
36/// the first call wins and installs the channel sender into [`TX`].
37pub fn start_drain_task() {
38    let (tx, mut rx) = mpsc::unbounded_channel::<TelemetryIngestRequest>();
39
40    // OnceLock::set is atomic — only the first caller proceeds.
41    if TX.set(tx).is_err() {
42        return;
43    }
44
45    tokio::spawn(async move {
46        while let Some(req) = rx.recv().await {
47            // Offload the blocking HTTP POST to the threadpool so the async
48            // runtime is never stalled by network I/O.
49            tokio::task::spawn_blocking(move || {
50                let Ok(client) = crate::cloud_client::ServerClient::load() else {
51                    return;
52                };
53                let _ = client.ingest_telemetry(&req);
54            });
55        }
56    });
57}
58
59/// Send a telemetry event from a short-lived process such as a shell hook.
60///
61/// Spawns a thread for the HTTP call and waits at most 300 ms before
62/// returning so the invoking process can exit promptly. If the server is
63/// unreachable the timeout fires and the event is silently dropped — telemetry
64/// is best-effort and must never delay the user's shell commands.
65pub fn fire_sync(request: TelemetryIngestRequest) {
66    let (done_tx, done_rx) = std::sync::mpsc::channel::<()>();
67    std::thread::spawn(move || {
68        if let Ok(client) = crate::cloud_client::ServerClient::load() {
69            let _ = client.ingest_telemetry(&request);
70        }
71        let _ = done_tx.send(());
72    });
73    let _ = done_rx.recv_timeout(Duration::from_millis(300));
74}