use std::sync::OnceLock;
use std::time::Duration;
use tokio::sync::mpsc::{self, UnboundedSender};
use crate::models::TelemetryIngestRequest;
static TX: OnceLock<UnboundedSender<TelemetryIngestRequest>> = OnceLock::new();
pub fn enqueue(request: TelemetryIngestRequest) {
if let Some(tx) = TX.get() {
let _ = tx.send(request);
return;
}
let _ = persist_request(&request);
}
pub fn start_drain_task() {
let (tx, mut rx) = mpsc::unbounded_channel::<TelemetryIngestRequest>();
if TX.set(tx).is_err() {
return;
}
tokio::spawn(async move {
drain_persisted();
while let Some(req) = rx.recv().await {
tokio::task::spawn_blocking(move || {
if deliver_request(&req).is_err() {
let _ = persist_request(&req);
}
});
}
});
}
pub fn fire_sync(request: TelemetryIngestRequest) {
let (done_tx, done_rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
if deliver_request(&request).is_err() {
let _ = persist_request(&request);
}
let _ = done_tx.send(());
});
let _ = done_rx.recv_timeout(Duration::from_millis(300));
}
pub fn flush_pending() -> usize {
let Ok(entries) = crate::core::sync_outbox::load_entries() else {
return 0;
};
let count = entries.len();
drain_persisted();
count
}
fn deliver_request(request: &TelemetryIngestRequest) -> anyhow::Result<()> {
let client = crate::server_client::ServerClient::load()?;
client.ingest_telemetry(request)
}
fn persist_request(request: &TelemetryIngestRequest) -> Result<(), String> {
crate::core::sync_outbox::enqueue(
crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest,
serde_json::to_value(request).map_err(|e| e.to_string())?,
)
.map(|_| ())
}
fn drain_persisted() {
let Ok(entries) = crate::core::sync_outbox::load_entries() else {
return;
};
for entry in entries {
let result = match entry.kind {
crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest => {
serde_json::from_value::<TelemetryIngestRequest>(entry.payload.clone())
.map_err(anyhow::Error::from)
.and_then(|request| deliver_request(&request))
}
crate::core::sync_outbox::OutboxOperationKind::ServerToolCall => {
crate::server_client::replay_queued_server_tool_call(entry.payload.clone())
}
crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync => {
crate::server_client::replay_queued_index_sync(entry.payload.clone())
}
};
match result {
Ok(()) => {
let _ = crate::core::sync_outbox::delete(&entry.id);
}
Err(error) => {
let _ = crate::core::sync_outbox::mark_failed(&entry, &error.to_string());
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Map;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::time::Duration;
#[test]
fn enqueue_persists_when_runtime_not_started() {
let _lock = crate::core::data_dir::test_env_lock();
let tmp = tempfile::tempdir().unwrap();
std::env::set_var("NEBU_CTX_DATA_DIR", tmp.path());
enqueue(TelemetryIngestRequest {
tool_name: "ctx_read".to_string(),
tokens_original: 10,
tokens_saved: 2,
duration_ms: 0,
mode: Some("test".to_string()),
repository_fingerprint: None,
checkout_binding: None,
project_slug: None,
});
let entries = crate::core::sync_outbox::load_entries().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(
entries[0].kind,
crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest
);
}
#[test]
fn flush_pending_replays_all_outbox_kinds_to_server() {
let _lock = crate::core::data_dir::test_env_lock();
let tmp = tempfile::tempdir().unwrap();
std::env::set_var("NEBU_CTX_DATA_DIR", tmp.path());
std::env::set_var("NEBU_CTX_HOME", tmp.path().join("home"));
let (endpoint, received_paths) = spawn_replay_server(4);
crate::config::save_connection(&endpoint, "test-token").unwrap();
enqueue_replay_fixtures(tmp.path());
assert_eq!(flush_pending(), 3);
assert!(crate::core::sync_outbox::load_entries().unwrap().is_empty());
let mut paths = Vec::new();
for _ in 0..4 {
paths.push(received_paths.recv_timeout(Duration::from_secs(2)).unwrap());
}
assert!(paths.contains(&"/v1/telemetry/ingest".to_string()));
assert!(paths.contains(&"/v1/tools/call".to_string()));
assert!(paths.contains(&"/v1/projects/resolve".to_string()));
assert!(paths.contains(&"/v1/index/sync".to_string()));
}
fn enqueue_replay_fixtures(root: &std::path::Path) {
let context = replay_project_context(root);
crate::core::sync_outbox::enqueue(
crate::core::sync_outbox::OutboxOperationKind::TelemetryIngest,
serde_json::to_value(TelemetryIngestRequest {
tool_name: "ctx_read".to_string(),
tokens_original: 100,
tokens_saved: 40,
duration_ms: 7,
mode: Some("test".to_string()),
repository_fingerprint: Some(context.fingerprint.clone()),
checkout_binding: Some(context.checkout_binding.clone()),
project_slug: Some(context.project_slug.clone()),
})
.unwrap(),
)
.unwrap();
crate::core::sync_outbox::enqueue(
crate::core::sync_outbox::OutboxOperationKind::ServerToolCall,
serde_json::to_value(crate::server_client::QueuedServerToolCall {
tool_name: "ctx_brain".to_string(),
arguments: Map::from_iter([
("action".to_string(), serde_json::json!("store")),
("key".to_string(), serde_json::json!("session-test")),
("value".to_string(), serde_json::json!("replayed")),
]),
project_context: (&context).into(),
})
.unwrap(),
)
.unwrap();
crate::core::sync_outbox::enqueue(
crate::core::sync_outbox::OutboxOperationKind::CodeIndexSync,
serde_json::to_value(crate::server_client::QueuedIndexSync {
project_context: (&context).into(),
files: vec![crate::server_client::IndexSyncFile {
path: "src/lib.rs".to_string(),
hash: "abc".to_string(),
language: "rust".to_string(),
line_count: 8,
token_count: 30,
exports: vec!["run".to_string()],
summary: "library".to_string(),
}],
symbols: vec![crate::server_client::IndexSyncSymbol {
file_path: "src/lib.rs".to_string(),
name: "run".to_string(),
kind: "function".to_string(),
start_line: 1,
end_line: 3,
is_exported: true,
}],
edges: vec![crate::server_client::IndexSyncEdge {
from_symbol: "run".to_string(),
to_symbol: "helper".to_string(),
kind: "calls".to_string(),
}],
})
.unwrap(),
)
.unwrap();
}
fn replay_project_context(root: &std::path::Path) -> crate::models::ProjectContext {
crate::models::ProjectContext {
project_slug: "sync-test".to_string(),
project_root: root.to_string_lossy().to_string(),
fingerprint: crate::models::RepositoryFingerprint {
remote_url: Some("https://github.com/example/sync-test.git".to_string()),
host: Some("github.com".to_string()),
owner: Some("example".to_string()),
repo_name: Some("sync-test".to_string()),
default_branch: Some("main".to_string()),
},
checkout_binding: crate::models::CheckoutBinding::default(),
project_metadata: None,
}
}
fn spawn_replay_server(expected_requests: usize) -> (String, mpsc::Receiver<String>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let endpoint = format!("http://{}", listener.local_addr().unwrap());
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
for _ in 0..expected_requests {
let Ok((mut stream, _)) = listener.accept() else {
break;
};
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let path = read_request_path(&mut stream);
let body = response_body_for(&path);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
stream.write_all(response.as_bytes()).unwrap();
tx.send(path).unwrap();
}
});
(endpoint, rx)
}
fn read_request_path(stream: &mut std::net::TcpStream) -> String {
let mut buffer = Vec::new();
let mut chunk = [0u8; 512];
loop {
let Ok(read) = stream.read(&mut chunk) else {
break;
};
if read == 0 {
break;
}
buffer.extend_from_slice(&chunk[..read]);
if request_complete(&buffer) {
break;
}
}
let request = String::from_utf8_lossy(&buffer);
request
.lines()
.next()
.and_then(|line| line.split_whitespace().nth(1))
.unwrap_or("/")
.to_string()
}
fn request_complete(buffer: &[u8]) -> bool {
let Some(header_end) = buffer.windows(4).position(|window| window == b"\r\n\r\n") else {
return false;
};
let headers = String::from_utf8_lossy(&buffer[..header_end]);
let content_length = headers
.lines()
.find_map(|line| line.split_once(':'))
.filter(|(name, _)| name.eq_ignore_ascii_case("content-length"))
.and_then(|(_, value)| value.trim().parse::<usize>().ok())
.unwrap_or(0);
buffer.len() >= header_end + 4 + content_length
}
fn response_body_for(path: &str) -> &'static str {
match path {
"/v1/projects/resolve" => {
r#"{"project":{"project_id":"proj_sync_test","slug":"sync-test","fingerprint":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"},"checkout_bound":true}"#
}
"/v1/tools/call" => r#"{"result":{"ok":true}}"#,
"/v1/telemetry/ingest" | "/v1/index/sync" => r#"{"ok":true}"#,
_ => r#"{"ok":true}"#,
}
}
}