1use anyhow::{Context, Result, anyhow, bail};
24use reqwest::StatusCode;
25use reqwest::blocking::Client;
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use std::time::Duration;
29
30use super::backend::WalSink;
31use super::Store;
32use super::models::{
33 AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
34 Span, SummaryReport, TraceComment, TraceQuery,
35};
36
37pub struct RemoteStore {
39 base_url: String,
40 http: Client,
41}
42
43impl RemoteStore {
44 pub fn new(base_url: impl Into<String>) -> Result<Self> {
46 Self::with_timeout(base_url, Duration::from_secs(30))
47 }
48
49 pub fn with_timeout(base_url: impl Into<String>, timeout: Duration) -> Result<Self> {
51 let http = Client::builder()
52 .timeout(timeout)
53 .build()
54 .context("building RemoteStore HTTP client")?;
55 Ok(Self {
56 base_url: base_url.into().trim_end_matches('/').to_string(),
57 http,
58 })
59 }
60
61 fn send_get(
62 &self,
63 path: &str,
64 params: &[(&str, String)],
65 ) -> Result<reqwest::blocking::Response> {
66 self.http
67 .get(format!("{}{path}", self.base_url))
68 .query(params)
69 .send()
70 .with_context(|| format!("GET {path} from {}", self.base_url))
71 }
72
73 fn get_json(&self, path: &str, params: &[(&str, String)]) -> Result<Value> {
75 let resp = self
76 .send_get(path, params)?
77 .error_for_status()
78 .with_context(|| format!("GET {path} from {}", self.base_url))?;
79 resp.json::<Value>()
80 .with_context(|| format!("decoding {path} response from {}", self.base_url))
81 }
82}
83
84fn field<T: DeserializeOwned>(mut value: Value, key: &str) -> Result<T> {
87 let v = value
88 .get_mut(key)
89 .map(Value::take)
90 .ok_or_else(|| anyhow!("response missing `{key}` field"))?;
91 serde_json::from_value(v).with_context(|| format!("deserializing `{key}`"))
92}
93
94fn last_param(params: &mut Vec<(&'static str, String)>, last_seconds: Option<i64>) {
97 if let Some(s) = last_seconds {
98 params.push(("last", s.to_string()));
99 }
100}
101
102impl Store for RemoteStore {
103 fn insert_spans(&self, _spans: &[Span]) -> Result<()> {
105 bail!("RemoteStore is read-only: route span ingest to the owning shard via OTLP");
106 }
107
108 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
109 let mut params: Vec<(&str, String)> = Vec::new();
110 if let Some(ref s) = query.service {
111 params.push(("service", s.clone()));
112 }
113 if let Some(ref o) = query.operation {
114 params.push(("operation", o.clone()));
115 }
116 if let Some(d) = query.min_duration_ms {
117 params.push(("min_duration_ms", d.to_string()));
118 }
119 if let Some(d) = query.max_duration_ms {
120 params.push(("max_duration_ms", d.to_string()));
121 }
122 if let Some(ref s) = query.status {
123 params.push(("status", s.clone()));
124 }
125 last_param(&mut params, query.last_seconds);
126 if let Some(l) = query.limit {
127 params.push(("limit", l.to_string()));
128 }
129 for (k, v) in &query.attributes {
130 params.push(("attribute", format!("{k}={v}")));
131 }
132 if let Some(ref t) = query.text {
133 params.push(("text", t.clone()));
134 }
135 let body = self.get_json("/api/v1/traces", ¶ms)?;
136 field(body, "spans")
137 }
138
139 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
140 let resp = self.send_get(&format!("/api/v1/traces/{trace_id}"), &[])?;
144 match resp.status() {
145 StatusCode::NOT_FOUND => Ok(Vec::new()),
146 s if s.is_success() => {
147 let body = resp
148 .json::<Value>()
149 .context("decoding get_trace response")?;
150 field(body, "spans")
151 }
152 s => bail!("get_trace {trace_id} on {}: HTTP {s}", self.base_url),
153 }
154 }
155
156 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
157 let body = self.get_json("/api/v1/services", &[])?;
158 field(body, "services")
159 }
160
161 fn add_comment(
163 &self,
164 trace_id: &str,
165 span_id: Option<&str>,
166 author: &str,
167 body: &str,
168 ) -> Result<TraceComment> {
169 let mut payload = serde_json::json!({ "author": author, "body": body });
170 if let Some(s) = span_id {
171 payload["span_id"] = serde_json::json!(s);
172 }
173 let resp = self
174 .http
175 .post(format!("{}/api/v1/traces/{trace_id}/comments", self.base_url))
176 .json(&payload)
177 .send()
178 .with_context(|| format!("POST comment to {}", self.base_url))?
179 .error_for_status()
180 .with_context(|| format!("POST comment to {}", self.base_url))?
181 .json::<Value>()
182 .context("decoding add_comment response")?;
183 field(resp, "comment")
184 }
185
186 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
187 let body = self.get_json(&format!("/api/v1/traces/{trace_id}/comments"), &[])?;
188 field(body, "comments")
189 }
190
191 fn insert_logs(&self, _logs: &[LogRecord]) -> Result<()> {
193 bail!("RemoteStore is read-only: route log ingest to the owning shard via OTLP");
194 }
195
196 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
197 let mut params: Vec<(&str, String)> = Vec::new();
198 if let Some(ref s) = query.service {
199 params.push(("service", s.clone()));
200 }
201 if let Some(ref s) = query.severity {
202 params.push(("severity", s.clone()));
203 }
204 if let Some(ref b) = query.body_contains {
205 params.push(("body_contains", b.clone()));
206 }
207 if let Some(ref t) = query.trace_id {
208 params.push(("trace_id", t.clone()));
209 }
210 last_param(&mut params, query.last_seconds);
211 if let Some(l) = query.limit {
212 params.push(("limit", l.to_string()));
213 }
214 let body = self.get_json("/api/v1/logs", ¶ms)?;
215 field(body, "logs")
216 }
217
218 fn insert_metrics(&self, _metrics: &[MetricPoint]) -> Result<()> {
220 bail!("RemoteStore is read-only: route metric ingest to the owning shard via OTLP");
221 }
222
223 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
224 let mut params: Vec<(&str, String)> = Vec::new();
225 if let Some(ref s) = query.service {
226 params.push(("service", s.clone()));
227 }
228 if let Some(ref n) = query.name {
229 params.push(("name", n.clone()));
230 }
231 if let Some(ref t) = query.metric_type {
232 params.push(("metric_type", t.clone()));
233 }
234 last_param(&mut params, query.last_seconds);
235 if let Some(l) = query.limit {
236 params.push(("limit", l.to_string()));
237 }
238 let body = self.get_json("/api/v1/metrics", ¶ms)?;
239 field(body, "metrics")
240 }
241
242 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
244 let mut params: Vec<(&str, String)> = vec![("last", last_seconds.to_string())];
245 if let Some(s) = service {
246 params.push(("service", s.to_string()));
247 }
248 let body = self.get_json("/api/v1/summary", ¶ms)?;
249 serde_json::from_value(body).context("deserializing SummaryReport")
250 }
251
252 fn query_anomalies(
253 &self,
254 current_seconds: i64,
255 baseline_seconds: i64,
256 service: Option<&str>,
257 ) -> Result<AnomalyReport> {
258 let mut params: Vec<(&str, String)> = vec![
259 ("last", current_seconds.to_string()),
260 ("baseline", baseline_seconds.to_string()),
261 ];
262 if let Some(s) = service {
263 params.push(("service", s.to_string()));
264 }
265 let body = self.get_json("/api/v1/anomalies", ¶ms)?;
266 serde_json::from_value(body).context("deserializing AnomalyReport")
267 }
268
269 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
270 let resp = self.send_get("/api/v1/correlate", &[("trace", trace_id.to_string())])?;
271 match resp.status() {
272 StatusCode::NOT_FOUND => Ok(None),
273 s if s.is_success() => {
274 let body = resp.json::<Value>().context("decoding correlate response")?;
275 Ok(Some(
276 serde_json::from_value(body).context("deserializing CorrelateReport")?,
277 ))
278 }
279 s => bail!("correlate {trace_id} on {}: HTTP {s}", self.base_url),
280 }
281 }
282
283 fn query_sql(&self, sql: &str) -> Result<Vec<Value>> {
284 let body = self.get_json("/api/v1/sql", &[("q", sql.to_string())])?;
285 field(body, "rows")
286 }
287
288 fn health(&self) -> Result<()> {
290 let resp = self
291 .send_get("/healthz", &[])?
292 .error_for_status()
293 .with_context(|| format!("health check on {}", self.base_url))?;
294 let _ = resp.text();
295 Ok(())
296 }
297}
298
299pub const WAL_EPOCH_HEADER: &str = "x-tael-wal-epoch";
308
309pub struct RemoteWalSink {
310 url: String,
311 http: Client,
312 epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
315}
316
317impl RemoteWalSink {
318 pub fn new(base_url: impl Into<String>) -> Result<Self> {
320 Self::build(base_url, None)
321 }
322
323 pub fn with_epoch(
326 base_url: impl Into<String>,
327 epoch: std::sync::Arc<std::sync::atomic::AtomicU64>,
328 ) -> Result<Self> {
329 Self::build(base_url, Some(epoch))
330 }
331
332 fn build(
333 base_url: impl Into<String>,
334 epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
335 ) -> Result<Self> {
336 let http = Client::builder()
337 .timeout(Duration::from_secs(30))
338 .build()
339 .context("building RemoteWalSink HTTP client")?;
340 let base = base_url.into().trim_end_matches('/').to_string();
341 Ok(Self {
342 url: format!("{base}/internal/wal/records"),
343 http,
344 epoch,
345 })
346 }
347}
348
349impl WalSink for RemoteWalSink {
350 fn append_framed(&self, framed: &[u8]) -> Result<()> {
351 let mut req = self
352 .http
353 .post(&self.url)
354 .header("content-type", "application/octet-stream");
355 if let Some(epoch) = &self.epoch {
356 req = req.header(
357 WAL_EPOCH_HEADER,
358 epoch.load(std::sync::atomic::Ordering::Acquire).to_string(),
359 );
360 }
361 req.body(framed.to_vec())
362 .send()
363 .with_context(|| format!("shipping WAL record to {}", self.url))?
364 .error_for_status()
365 .with_context(|| format!("standby {} rejected WAL record", self.url))?;
366 Ok(())
367 }
368
369 fn name(&self) -> &str {
370 &self.url
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use crate::storage::DuckDbStore;
378 use crate::storage::models::{SpanKind, SpanStatus};
379 use chrono::Utc;
380 use std::collections::HashMap;
381 use std::net::SocketAddr;
382 use std::sync::Arc;
383
384 fn test_span(trace: &str, sid: &str, svc: &str) -> Span {
385 let now = Utc::now();
386 Span {
387 trace_id: trace.into(),
388 span_id: sid.into(),
389 parent_span_id: None,
390 service: svc.into(),
391 operation: "op".into(),
392 start_time: now,
393 end_time: now,
394 duration_ms: 12.0,
395 status: SpanStatus::Ok,
396 attributes: HashMap::new(),
397 events: vec![],
398 kind: SpanKind::Internal,
399 llm: None,
400 }
401 }
402
403 fn serve_with(spans: Vec<Span>) -> SocketAddr {
408 let dir = tempfile::tempdir().unwrap();
409 let path = dir.path().to_str().unwrap().to_string();
410 let (tx, rx) = std::sync::mpsc::channel();
411 std::thread::spawn(move || {
412 let _dir = dir;
414 let rt = tokio::runtime::Runtime::new().unwrap();
415 rt.block_on(async move {
416 let store: Arc<dyn Store> = Arc::new(DuckDbStore::new(&path).unwrap());
417 store.insert_spans(&spans).unwrap();
418 let blobs = Arc::new(crate::storage::BlobStore::new(&path).unwrap());
419 let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
420 let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
421 let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
422 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
423 tx.send(listener.local_addr().unwrap()).unwrap();
424 axum::serve(listener, app).await.unwrap();
425 });
426 });
427 rx.recv().unwrap()
428 }
429
430 #[test]
431 fn remote_store_roundtrips_reads_over_http() {
432 let addr = serve_with(vec![
433 test_span("t1", "s1", "api"),
434 test_span("t1", "s2", "db"),
435 test_span("t2", "s3", "api"),
436 ]);
437 let remote = RemoteStore::new(format!("http://{addr}")).unwrap();
438
439 remote.health().expect("health");
440
441 let traces = remote.query_traces(&TraceQuery::default()).unwrap();
442 assert_eq!(traces.len(), 3, "all spans returned over HTTP");
443
444 let t1 = remote.get_trace("t1").unwrap();
445 assert_eq!(t1.len(), 2);
446 assert!(t1.iter().all(|s| s.trace_id == "t1"));
447
448 assert!(remote.get_trace("missing").unwrap().is_empty());
450
451 let services = remote.list_services().unwrap();
452 assert!(services.iter().any(|s| s.name == "api"));
453
454 assert!(remote.insert_spans(&[test_span("t3", "s4", "api")]).is_err());
456 }
457
458 fn serve_store(store: Arc<dyn Store>, data_dir: String) -> SocketAddr {
461 let (tx, rx) = std::sync::mpsc::channel();
462 std::thread::spawn(move || {
463 let rt = tokio::runtime::Runtime::new().unwrap();
464 rt.block_on(async move {
465 let blobs = Arc::new(crate::storage::BlobStore::new(&data_dir).unwrap());
466 let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
467 let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
468 let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
469 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
470 tx.send(listener.local_addr().unwrap()).unwrap();
471 axum::serve(listener, app).await.unwrap();
472 });
473 });
474 rx.recv().unwrap()
475 }
476
477 struct WalKeyGuard(String);
479 impl Drop for WalKeyGuard {
480 fn drop(&mut self) {
481 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
482 }
483 }
484
485 #[test]
486 fn wal_replication_ships_leader_writes_to_standby_over_http() {
487 use crate::storage::TaelBackend;
488
489 let standby_dir = tempfile::tempdir().unwrap();
493 let standby_path = standby_dir.path().to_str().unwrap().to_string();
494 let standby_key = format!("tael-test-standby-{}", uuid::Uuid::new_v4());
495 let _sg = WalKeyGuard(standby_key.clone());
496 let standby = Arc::new(TaelBackend::with_wal_key(&standby_path, &standby_key).unwrap());
497 let standby_addr = serve_store(Arc::clone(&standby) as Arc<dyn Store>, standby_path);
498
499 let leader_dir = tempfile::tempdir().unwrap();
502 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
503 let _lg = WalKeyGuard(leader_key.clone());
504 let sink = Arc::new(RemoteWalSink::new(format!("http://{standby_addr}")).unwrap());
505 let leader = TaelBackend::with_wal_key_and_sinks(
506 leader_dir.path().to_str().unwrap(),
507 &leader_key,
508 vec![sink],
509 None,
510 )
511 .unwrap();
512
513 leader
515 .insert_spans(&[
516 test_span("t1", "s1", "api"),
517 test_span("t1", "s2", "db"),
518 test_span("t2", "s3", "api"),
519 ])
520 .unwrap();
521
522 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
524 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
525
526 let via_http = RemoteStore::new(format!("http://{standby_addr}")).unwrap();
528 assert_eq!(via_http.query_traces(&TraceQuery::default()).unwrap().len(), 3);
529 }
530}