Skip to main content

tael_server/storage/
remote.rs

1//! `RemoteStore` — a [`Store`] that speaks HTTP to another `tael-server`'s REST
2//! API. This is the "remote `Store` client" the scaling/HA design calls for
3//! (`docs/tael-server-scaling-ha.md` §3, Phase 2): the thin client that lets a
4//! [`FanoutStore`](super::FanoutStore) scatter reads across N shard processes
5//! without the REST/gRPC/CLI layers above the `Store` trait changing at all.
6//!
7//! ## Synchronous over blocking HTTP
8//!
9//! The `Store` trait is synchronous by design (`storage/mod.rs`). We therefore
10//! use [`reqwest::blocking`], whose client owns its own runtime on a dedicated
11//! thread and parks the caller on a channel — safe to call from inside the
12//! server's tokio workers, and consistent with the rest of the engine treating
13//! `Store` calls as blocking.
14//!
15//! ## Read-only
16//!
17//! There is no typed REST ingest endpoint (ingest is OTLP/gRPC), so the write
18//! methods (`insert_*`) return an error. In the sharded topology, writes are
19//! routed to the owning shard at the ingest edge (the OTel Collector hashing on
20//! `trace_id`), never through this client — see the design's §3 routing layer.
21//! Comment writes, which *do* have a REST endpoint, are supported.
22
23use anyhow::{Context, Result, anyhow, bail};
24use reqwest::StatusCode;
25use reqwest::blocking::Client;
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use std::time::Duration;
29
30use super::backend::WalSink;
31use super::Store;
32use super::models::{
33    AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
34    Span, SummaryReport, TraceComment, TraceQuery,
35};
36
37/// A [`Store`] backed by another `tael-server`'s REST API over HTTP.
38pub struct RemoteStore {
39    base_url: String,
40    http: Client,
41}
42
43impl RemoteStore {
44    /// Connect to a `tael-server` REST endpoint, e.g. `http://shard-0:7701`.
45    pub fn new(base_url: impl Into<String>) -> Result<Self> {
46        Self::with_timeout(base_url, Duration::from_secs(30))
47    }
48
49    /// Like [`Self::new`] with an explicit per-request timeout.
50    pub fn with_timeout(base_url: impl Into<String>, timeout: Duration) -> Result<Self> {
51        let http = Client::builder()
52            .timeout(timeout)
53            .build()
54            .context("building RemoteStore HTTP client")?;
55        Ok(Self {
56            base_url: base_url.into().trim_end_matches('/').to_string(),
57            http,
58        })
59    }
60
61    fn send_get(
62        &self,
63        path: &str,
64        params: &[(&str, String)],
65    ) -> Result<reqwest::blocking::Response> {
66        self.http
67            .get(format!("{}{path}", self.base_url))
68            .query(params)
69            .send()
70            .with_context(|| format!("GET {path} from {}", self.base_url))
71    }
72
73    /// GET expecting a JSON object, surfacing non-2xx as an error.
74    fn get_json(&self, path: &str, params: &[(&str, String)]) -> Result<Value> {
75        let resp = self
76            .send_get(path, params)?
77            .error_for_status()
78            .with_context(|| format!("GET {path} from {}", self.base_url))?;
79        resp.json::<Value>()
80            .with_context(|| format!("decoding {path} response from {}", self.base_url))
81    }
82}
83
84/// Pull a named field out of a JSON envelope and deserialize it. The REST API
85/// wraps payloads, e.g. `{ "spans": [...] }`, `{ "logs": [...] }`.
86fn field<T: DeserializeOwned>(mut value: Value, key: &str) -> Result<T> {
87    let v = value
88        .get_mut(key)
89        .map(Value::take)
90        .ok_or_else(|| anyhow!("response missing `{key}` field"))?;
91    serde_json::from_value(v).with_context(|| format!("deserializing `{key}`"))
92}
93
94/// Express a `last_seconds` lower bound as the `last=` query param. The REST
95/// layer's `parse_duration_to_seconds` accepts a bare integer as seconds.
96fn last_param(params: &mut Vec<(&'static str, String)>, last_seconds: Option<i64>) {
97    if let Some(s) = last_seconds {
98        params.push(("last", s.to_string()));
99    }
100}
101
102impl Store for RemoteStore {
103    // ── Spans / traces ──────────────────────────────────────────────
104    fn insert_spans(&self, _spans: &[Span]) -> Result<()> {
105        bail!("RemoteStore is read-only: route span ingest to the owning shard via OTLP");
106    }
107
108    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
109        let mut params: Vec<(&str, String)> = Vec::new();
110        if let Some(ref s) = query.service {
111            params.push(("service", s.clone()));
112        }
113        if let Some(ref o) = query.operation {
114            params.push(("operation", o.clone()));
115        }
116        if let Some(d) = query.min_duration_ms {
117            params.push(("min_duration_ms", d.to_string()));
118        }
119        if let Some(d) = query.max_duration_ms {
120            params.push(("max_duration_ms", d.to_string()));
121        }
122        if let Some(ref s) = query.status {
123            params.push(("status", s.clone()));
124        }
125        last_param(&mut params, query.last_seconds);
126        if let Some(l) = query.limit {
127            params.push(("limit", l.to_string()));
128        }
129        for (k, v) in &query.attributes {
130            params.push(("attribute", format!("{k}={v}")));
131        }
132        if let Some(ref t) = query.text {
133            params.push(("text", t.clone()));
134        }
135        let body = self.get_json("/api/v1/traces", &params)?;
136        field(body, "spans")
137    }
138
139    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
140        // 404 == "no such trace" rather than an error; map it to an empty set so
141        // fan-out can union shards without a missing shard aborting the whole
142        // lookup.
143        let resp = self.send_get(&format!("/api/v1/traces/{trace_id}"), &[])?;
144        match resp.status() {
145            StatusCode::NOT_FOUND => Ok(Vec::new()),
146            s if s.is_success() => {
147                let body = resp
148                    .json::<Value>()
149                    .context("decoding get_trace response")?;
150                field(body, "spans")
151            }
152            s => bail!("get_trace {trace_id} on {}: HTTP {s}", self.base_url),
153        }
154    }
155
156    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
157        let body = self.get_json("/api/v1/services", &[])?;
158        field(body, "services")
159    }
160
161    // ── Comments ────────────────────────────────────────────────────
162    fn add_comment(
163        &self,
164        trace_id: &str,
165        span_id: Option<&str>,
166        author: &str,
167        body: &str,
168    ) -> Result<TraceComment> {
169        let mut payload = serde_json::json!({ "author": author, "body": body });
170        if let Some(s) = span_id {
171            payload["span_id"] = serde_json::json!(s);
172        }
173        let resp = self
174            .http
175            .post(format!("{}/api/v1/traces/{trace_id}/comments", self.base_url))
176            .json(&payload)
177            .send()
178            .with_context(|| format!("POST comment to {}", self.base_url))?
179            .error_for_status()
180            .with_context(|| format!("POST comment to {}", self.base_url))?
181            .json::<Value>()
182            .context("decoding add_comment response")?;
183        field(resp, "comment")
184    }
185
186    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
187        let body = self.get_json(&format!("/api/v1/traces/{trace_id}/comments"), &[])?;
188        field(body, "comments")
189    }
190
191    // ── Logs ────────────────────────────────────────────────────────
192    fn insert_logs(&self, _logs: &[LogRecord]) -> Result<()> {
193        bail!("RemoteStore is read-only: route log ingest to the owning shard via OTLP");
194    }
195
196    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
197        let mut params: Vec<(&str, String)> = Vec::new();
198        if let Some(ref s) = query.service {
199            params.push(("service", s.clone()));
200        }
201        if let Some(ref s) = query.severity {
202            params.push(("severity", s.clone()));
203        }
204        if let Some(ref b) = query.body_contains {
205            params.push(("body_contains", b.clone()));
206        }
207        if let Some(ref t) = query.trace_id {
208            params.push(("trace_id", t.clone()));
209        }
210        last_param(&mut params, query.last_seconds);
211        if let Some(l) = query.limit {
212            params.push(("limit", l.to_string()));
213        }
214        let body = self.get_json("/api/v1/logs", &params)?;
215        field(body, "logs")
216    }
217
218    // ── Metrics ─────────────────────────────────────────────────────
219    fn insert_metrics(&self, _metrics: &[MetricPoint]) -> Result<()> {
220        bail!("RemoteStore is read-only: route metric ingest to the owning shard via OTLP");
221    }
222
223    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
224        let mut params: Vec<(&str, String)> = Vec::new();
225        if let Some(ref s) = query.service {
226            params.push(("service", s.clone()));
227        }
228        if let Some(ref n) = query.name {
229            params.push(("name", n.clone()));
230        }
231        if let Some(ref t) = query.metric_type {
232            params.push(("metric_type", t.clone()));
233        }
234        last_param(&mut params, query.last_seconds);
235        if let Some(l) = query.limit {
236            params.push(("limit", l.to_string()));
237        }
238        let body = self.get_json("/api/v1/metrics", &params)?;
239        field(body, "metrics")
240    }
241
242    // ── Cross-signal analytics ──────────────────────────────────────
243    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
244        let mut params: Vec<(&str, String)> = vec![("last", last_seconds.to_string())];
245        if let Some(s) = service {
246            params.push(("service", s.to_string()));
247        }
248        let body = self.get_json("/api/v1/summary", &params)?;
249        serde_json::from_value(body).context("deserializing SummaryReport")
250    }
251
252    fn query_anomalies(
253        &self,
254        current_seconds: i64,
255        baseline_seconds: i64,
256        service: Option<&str>,
257    ) -> Result<AnomalyReport> {
258        let mut params: Vec<(&str, String)> = vec![
259            ("last", current_seconds.to_string()),
260            ("baseline", baseline_seconds.to_string()),
261        ];
262        if let Some(s) = service {
263            params.push(("service", s.to_string()));
264        }
265        let body = self.get_json("/api/v1/anomalies", &params)?;
266        serde_json::from_value(body).context("deserializing AnomalyReport")
267    }
268
269    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
270        let resp = self.send_get("/api/v1/correlate", &[("trace", trace_id.to_string())])?;
271        match resp.status() {
272            StatusCode::NOT_FOUND => Ok(None),
273            s if s.is_success() => {
274                let body = resp.json::<Value>().context("decoding correlate response")?;
275                Ok(Some(
276                    serde_json::from_value(body).context("deserializing CorrelateReport")?,
277                ))
278            }
279            s => bail!("correlate {trace_id} on {}: HTTP {s}", self.base_url),
280        }
281    }
282
283    fn query_sql(&self, sql: &str) -> Result<Vec<Value>> {
284        let body = self.get_json("/api/v1/sql", &[("q", sql.to_string())])?;
285        field(body, "rows")
286    }
287
288    // ── Lifecycle ───────────────────────────────────────────────────
289    fn health(&self) -> Result<()> {
290        let resp = self
291            .send_get("/healthz", &[])?
292            .error_for_status()
293            .with_context(|| format!("health check on {}", self.base_url))?;
294        let _ = resp.text();
295        Ok(())
296    }
297}
298
299/// A [`WalSink`] that ships framed WAL records to a standby `tael-server`'s
300/// `POST /internal/wal/records` endpoint over blocking HTTP — the leader→standby
301/// transport for WAL replication (`docs/tael-server-scaling-ha.md` §5.1).
302/// Blocking, like [`RemoteStore`], because the WAL append path that drives it is
303/// synchronous; `append_framed` returns only once the standby has applied the
304/// record (the per-record ack that makes replication synchronous).
305/// HTTP header carrying the leader's epoch, so the standby can fence out a
306/// deposed leader's records (`cluster::EpochFencer`).
307pub const WAL_EPOCH_HEADER: &str = "x-tael-wal-epoch";
308
309pub struct RemoteWalSink {
310    url: String,
311    http: Client,
312    /// The leader's current epoch, stamped on each shipped record. `None` when
313    /// running without cluster coordination (single leader, no fencing needed).
314    epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
315}
316
317impl RemoteWalSink {
318    /// Target a standby by its REST base URL (e.g. `http://standby-1:7701`).
319    pub fn new(base_url: impl Into<String>) -> Result<Self> {
320        Self::build(base_url, None)
321    }
322
323    /// Like [`Self::new`] but stamps the leader's current epoch (from cluster
324    /// coordination) on each shipped record for standby fencing.
325    pub fn with_epoch(
326        base_url: impl Into<String>,
327        epoch: std::sync::Arc<std::sync::atomic::AtomicU64>,
328    ) -> Result<Self> {
329        Self::build(base_url, Some(epoch))
330    }
331
332    fn build(
333        base_url: impl Into<String>,
334        epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
335    ) -> Result<Self> {
336        let http = Client::builder()
337            .timeout(Duration::from_secs(30))
338            .build()
339            .context("building RemoteWalSink HTTP client")?;
340        let base = base_url.into().trim_end_matches('/').to_string();
341        Ok(Self {
342            url: format!("{base}/internal/wal/records"),
343            http,
344            epoch,
345        })
346    }
347}
348
349impl WalSink for RemoteWalSink {
350    fn append_framed(&self, framed: &[u8]) -> Result<()> {
351        let mut req = self
352            .http
353            .post(&self.url)
354            .header("content-type", "application/octet-stream");
355        if let Some(epoch) = &self.epoch {
356            req = req.header(
357                WAL_EPOCH_HEADER,
358                epoch.load(std::sync::atomic::Ordering::Acquire).to_string(),
359            );
360        }
361        req.body(framed.to_vec())
362            .send()
363            .with_context(|| format!("shipping WAL record to {}", self.url))?
364            .error_for_status()
365            .with_context(|| format!("standby {} rejected WAL record", self.url))?;
366        Ok(())
367    }
368
369    fn name(&self) -> &str {
370        &self.url
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::storage::DuckDbStore;
378    use crate::storage::models::{SpanKind, SpanStatus};
379    use chrono::Utc;
380    use std::collections::HashMap;
381    use std::net::SocketAddr;
382    use std::sync::Arc;
383
384    fn test_span(trace: &str, sid: &str, svc: &str) -> Span {
385        let now = Utc::now();
386        Span {
387            trace_id: trace.into(),
388            span_id: sid.into(),
389            parent_span_id: None,
390            service: svc.into(),
391            operation: "op".into(),
392            start_time: now,
393            end_time: now,
394            duration_ms: 12.0,
395            status: SpanStatus::Ok,
396            attributes: HashMap::new(),
397            events: vec![],
398            kind: SpanKind::Internal,
399            llm: None,
400        }
401    }
402
403    /// Boot a real REST server (DuckDB-backed) on an ephemeral port in its own
404    /// runtime thread, seeded with `spans`, and return its address. Running the
405    /// server off-thread lets the blocking `RemoteStore` calls in the test
406    /// thread proceed without nesting/deadlocking a runtime.
407    fn serve_with(spans: Vec<Span>) -> SocketAddr {
408        let dir = tempfile::tempdir().unwrap();
409        let path = dir.path().to_str().unwrap().to_string();
410        let (tx, rx) = std::sync::mpsc::channel();
411        std::thread::spawn(move || {
412            // Keep the tempdir alive for the server thread's lifetime.
413            let _dir = dir;
414            let rt = tokio::runtime::Runtime::new().unwrap();
415            rt.block_on(async move {
416                let store: Arc<dyn Store> = Arc::new(DuckDbStore::new(&path).unwrap());
417                store.insert_spans(&spans).unwrap();
418                let blobs = Arc::new(crate::storage::BlobStore::new(&path).unwrap());
419                let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
420                let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
421                let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
422                let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
423                tx.send(listener.local_addr().unwrap()).unwrap();
424                axum::serve(listener, app).await.unwrap();
425            });
426        });
427        rx.recv().unwrap()
428    }
429
430    #[test]
431    fn remote_store_roundtrips_reads_over_http() {
432        let addr = serve_with(vec![
433            test_span("t1", "s1", "api"),
434            test_span("t1", "s2", "db"),
435            test_span("t2", "s3", "api"),
436        ]);
437        let remote = RemoteStore::new(format!("http://{addr}")).unwrap();
438
439        remote.health().expect("health");
440
441        let traces = remote.query_traces(&TraceQuery::default()).unwrap();
442        assert_eq!(traces.len(), 3, "all spans returned over HTTP");
443
444        let t1 = remote.get_trace("t1").unwrap();
445        assert_eq!(t1.len(), 2);
446        assert!(t1.iter().all(|s| s.trace_id == "t1"));
447
448        // 404 maps to an empty set, not an error.
449        assert!(remote.get_trace("missing").unwrap().is_empty());
450
451        let services = remote.list_services().unwrap();
452        assert!(services.iter().any(|s| s.name == "api"));
453
454        // Writes are rejected: ingest must go to the owning shard via OTLP.
455        assert!(remote.insert_spans(&[test_span("t3", "s4", "api")]).is_err());
456    }
457
458    /// Serve an existing store over a fresh REST server thread (the store is
459    /// built by the caller so it can also hold a handle to it).
460    fn serve_store(store: Arc<dyn Store>, data_dir: String) -> SocketAddr {
461        let (tx, rx) = std::sync::mpsc::channel();
462        std::thread::spawn(move || {
463            let rt = tokio::runtime::Runtime::new().unwrap();
464            rt.block_on(async move {
465                let blobs = Arc::new(crate::storage::BlobStore::new(&data_dir).unwrap());
466                let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
467                let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
468                let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
469                let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
470                tx.send(listener.local_addr().unwrap()).unwrap();
471                axum::serve(listener, app).await.unwrap();
472            });
473        });
474        rx.recv().unwrap()
475    }
476
477    /// Removes a walrus namespace dir on drop so test runs don't accrete state.
478    struct WalKeyGuard(String);
479    impl Drop for WalKeyGuard {
480        fn drop(&mut self) {
481            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
482        }
483    }
484
485    #[test]
486    fn wal_replication_ships_leader_writes_to_standby_over_http() {
487        use crate::storage::TaelBackend;
488
489        // A standby tael-backend, served over HTTP so its
490        // /internal/wal/records endpoint is reachable. We keep a handle to
491        // assert its state directly.
492        let standby_dir = tempfile::tempdir().unwrap();
493        let standby_path = standby_dir.path().to_str().unwrap().to_string();
494        let standby_key = format!("tael-test-standby-{}", uuid::Uuid::new_v4());
495        let _sg = WalKeyGuard(standby_key.clone());
496        let standby = Arc::new(TaelBackend::with_wal_key(&standby_path, &standby_key).unwrap());
497        let standby_addr = serve_store(Arc::clone(&standby) as Arc<dyn Store>, standby_path);
498
499        // A leader that ships its WAL to the standby over HTTP, synchronously
500        // (required_acks = None ⇒ all standbys must ack before a write returns).
501        let leader_dir = tempfile::tempdir().unwrap();
502        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
503        let _lg = WalKeyGuard(leader_key.clone());
504        let sink = Arc::new(RemoteWalSink::new(format!("http://{standby_addr}")).unwrap());
505        let leader = TaelBackend::with_wal_key_and_sinks(
506            leader_dir.path().to_str().unwrap(),
507            &leader_key,
508            vec![sink],
509            None,
510        )
511        .unwrap();
512
513        // Write to the leader only.
514        leader
515            .insert_spans(&[
516                test_span("t1", "s1", "api"),
517                test_span("t1", "s2", "db"),
518                test_span("t2", "s3", "api"),
519            ])
520            .unwrap();
521
522        // The standby received and applied the shipped records over HTTP.
523        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
524        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
525
526        // And it serves them over its own REST API (full path works).
527        let via_http = RemoteStore::new(format!("http://{standby_addr}")).unwrap();
528        assert_eq!(via_http.query_traces(&TraceQuery::default()).unwrap().len(), 3);
529    }
530}