Skip to main content

tael_server/storage/
remote.rs

1//! `RemoteStore` — a [`Store`] that speaks HTTP to another `tael-server`'s REST
2//! API. This is the "remote `Store` client" the scaling/HA design calls for
3//! (`docs/tael-server-scaling-ha.md` §3, Phase 2): the thin client that lets a
4//! [`FanoutStore`](super::FanoutStore) scatter reads across N shard processes
5//! without the REST/gRPC/CLI layers above the `Store` trait changing at all.
6//!
7//! ## Synchronous over blocking HTTP
8//!
9//! The `Store` trait is synchronous by design (`storage/mod.rs`). We therefore
10//! use [`reqwest::blocking`], whose client owns its own runtime on a dedicated
11//! thread and parks the caller on a channel — safe to call from inside the
12//! server's tokio workers, and consistent with the rest of the engine treating
13//! `Store` calls as blocking.
14//!
15//! ## Read-only
16//!
17//! There is no typed REST ingest endpoint (ingest is OTLP/gRPC), so the write
18//! methods (`insert_*`) return an error. In the sharded topology, writes are
19//! routed to the owning shard at the ingest edge (the OTel Collector hashing on
20//! `trace_id`), never through this client — see the design's §3 routing layer.
21//! Comment writes, which *do* have a REST endpoint, are supported.
22
23use anyhow::{Context, Result, anyhow, bail};
24use reqwest::StatusCode;
25use reqwest::blocking::Client;
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use std::time::Duration;
29
30use super::Store;
31use super::backend::WalSink;
32use super::models::{
33    AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
34    Span, SummaryReport, TraceComment, TraceQuery,
35};
36
37/// A [`Store`] backed by another `tael-server`'s REST API over HTTP.
38pub struct RemoteStore {
39    base_url: String,
40    http: Client,
41}
42
43impl RemoteStore {
44    /// Connect to a `tael-server` REST endpoint, e.g. `http://shard-0:7701`.
45    pub fn new(base_url: impl Into<String>) -> Result<Self> {
46        Self::with_timeout(base_url, Duration::from_secs(30))
47    }
48
49    /// Like [`Self::new`] with an explicit per-request timeout.
50    pub fn with_timeout(base_url: impl Into<String>, timeout: Duration) -> Result<Self> {
51        let http = Client::builder()
52            .timeout(timeout)
53            .build()
54            .context("building RemoteStore HTTP client")?;
55        Ok(Self {
56            base_url: base_url.into().trim_end_matches('/').to_string(),
57            http,
58        })
59    }
60
61    fn send_get(
62        &self,
63        path: &str,
64        params: &[(&str, String)],
65    ) -> Result<reqwest::blocking::Response> {
66        self.http
67            .get(format!("{}{path}", self.base_url))
68            .query(params)
69            .send()
70            .with_context(|| format!("GET {path} from {}", self.base_url))
71    }
72
73    /// GET expecting a JSON object, surfacing non-2xx as an error.
74    fn get_json(&self, path: &str, params: &[(&str, String)]) -> Result<Value> {
75        let resp = self
76            .send_get(path, params)?
77            .error_for_status()
78            .with_context(|| format!("GET {path} from {}", self.base_url))?;
79        resp.json::<Value>()
80            .with_context(|| format!("decoding {path} response from {}", self.base_url))
81    }
82}
83
84/// Pull a named field out of a JSON envelope and deserialize it. The REST API
85/// wraps payloads, e.g. `{ "spans": [...] }`, `{ "logs": [...] }`.
86fn field<T: DeserializeOwned>(mut value: Value, key: &str) -> Result<T> {
87    let v = value
88        .get_mut(key)
89        .map(Value::take)
90        .ok_or_else(|| anyhow!("response missing `{key}` field"))?;
91    serde_json::from_value(v).with_context(|| format!("deserializing `{key}`"))
92}
93
94/// Express a `last_seconds` lower bound as the `last=` query param. The REST
95/// layer's `parse_duration_to_seconds` accepts a bare integer as seconds.
96fn last_param(params: &mut Vec<(&'static str, String)>, last_seconds: Option<i64>) {
97    if let Some(s) = last_seconds {
98        params.push(("last", s.to_string()));
99    }
100}
101
102impl Store for RemoteStore {
103    // ── Spans / traces ──────────────────────────────────────────────
104    fn insert_spans(&self, _spans: &[Span]) -> Result<()> {
105        bail!("RemoteStore is read-only: route span ingest to the owning shard via OTLP");
106    }
107
108    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
109        let mut params: Vec<(&str, String)> = Vec::new();
110        if let Some(ref s) = query.service {
111            params.push(("service", s.clone()));
112        }
113        if let Some(ref o) = query.operation {
114            params.push(("operation", o.clone()));
115        }
116        if let Some(d) = query.min_duration_ms {
117            params.push(("min_duration_ms", d.to_string()));
118        }
119        if let Some(d) = query.max_duration_ms {
120            params.push(("max_duration_ms", d.to_string()));
121        }
122        if let Some(ref s) = query.status {
123            params.push(("status", s.clone()));
124        }
125        last_param(&mut params, query.last_seconds);
126        if let Some(l) = query.limit {
127            params.push(("limit", l.to_string()));
128        }
129        for (k, v) in &query.attributes {
130            params.push(("attribute", format!("{k}={v}")));
131        }
132        if let Some(ref t) = query.text {
133            params.push(("text", t.clone()));
134        }
135        let body = self.get_json("/api/v1/traces", &params)?;
136        field(body, "spans")
137    }
138
139    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
140        // 404 == "no such trace" rather than an error; map it to an empty set so
141        // fan-out can union shards without a missing shard aborting the whole
142        // lookup.
143        let resp = self.send_get(&format!("/api/v1/traces/{trace_id}"), &[])?;
144        match resp.status() {
145            StatusCode::NOT_FOUND => Ok(Vec::new()),
146            s if s.is_success() => {
147                let body = resp
148                    .json::<Value>()
149                    .context("decoding get_trace response")?;
150                field(body, "spans")
151            }
152            s => bail!("get_trace {trace_id} on {}: HTTP {s}", self.base_url),
153        }
154    }
155
156    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
157        let body = self.get_json("/api/v1/services", &[])?;
158        field(body, "services")
159    }
160
161    // ── Comments ────────────────────────────────────────────────────
162    fn add_comment(
163        &self,
164        trace_id: &str,
165        span_id: Option<&str>,
166        author: &str,
167        body: &str,
168    ) -> Result<TraceComment> {
169        let mut payload = serde_json::json!({ "author": author, "body": body });
170        if let Some(s) = span_id {
171            payload["span_id"] = serde_json::json!(s);
172        }
173        let resp = self
174            .http
175            .post(format!(
176                "{}/api/v1/traces/{trace_id}/comments",
177                self.base_url
178            ))
179            .json(&payload)
180            .send()
181            .with_context(|| format!("POST comment to {}", self.base_url))?
182            .error_for_status()
183            .with_context(|| format!("POST comment to {}", self.base_url))?
184            .json::<Value>()
185            .context("decoding add_comment response")?;
186        field(resp, "comment")
187    }
188
189    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
190        let body = self.get_json(&format!("/api/v1/traces/{trace_id}/comments"), &[])?;
191        field(body, "comments")
192    }
193
194    // ── Logs ────────────────────────────────────────────────────────
195    fn insert_logs(&self, _logs: &[LogRecord]) -> Result<()> {
196        bail!("RemoteStore is read-only: route log ingest to the owning shard via OTLP");
197    }
198
199    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
200        let mut params: Vec<(&str, String)> = Vec::new();
201        if let Some(ref s) = query.service {
202            params.push(("service", s.clone()));
203        }
204        if let Some(ref s) = query.severity {
205            params.push(("severity", s.clone()));
206        }
207        if let Some(ref b) = query.body_contains {
208            params.push(("body_contains", b.clone()));
209        }
210        if let Some(ref t) = query.trace_id {
211            params.push(("trace_id", t.clone()));
212        }
213        last_param(&mut params, query.last_seconds);
214        if let Some(l) = query.limit {
215            params.push(("limit", l.to_string()));
216        }
217        let body = self.get_json("/api/v1/logs", &params)?;
218        field(body, "logs")
219    }
220
221    // ── Metrics ─────────────────────────────────────────────────────
222    fn insert_metrics(&self, _metrics: &[MetricPoint]) -> Result<()> {
223        bail!("RemoteStore is read-only: route metric ingest to the owning shard via OTLP");
224    }
225
226    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
227        let mut params: Vec<(&str, String)> = Vec::new();
228        if let Some(ref s) = query.service {
229            params.push(("service", s.clone()));
230        }
231        if let Some(ref n) = query.name {
232            params.push(("name", n.clone()));
233        }
234        if let Some(ref t) = query.metric_type {
235            params.push(("metric_type", t.clone()));
236        }
237        last_param(&mut params, query.last_seconds);
238        if let Some(l) = query.limit {
239            params.push(("limit", l.to_string()));
240        }
241        let body = self.get_json("/api/v1/metrics", &params)?;
242        field(body, "metrics")
243    }
244
245    // ── Cross-signal analytics ──────────────────────────────────────
246    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
247        let mut params: Vec<(&str, String)> = vec![("last", last_seconds.to_string())];
248        if let Some(s) = service {
249            params.push(("service", s.to_string()));
250        }
251        let body = self.get_json("/api/v1/summary", &params)?;
252        serde_json::from_value(body).context("deserializing SummaryReport")
253    }
254
255    fn query_anomalies(
256        &self,
257        current_seconds: i64,
258        baseline_seconds: i64,
259        service: Option<&str>,
260    ) -> Result<AnomalyReport> {
261        let mut params: Vec<(&str, String)> = vec![
262            ("last", current_seconds.to_string()),
263            ("baseline", baseline_seconds.to_string()),
264        ];
265        if let Some(s) = service {
266            params.push(("service", s.to_string()));
267        }
268        let body = self.get_json("/api/v1/anomalies", &params)?;
269        serde_json::from_value(body).context("deserializing AnomalyReport")
270    }
271
272    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
273        let resp = self.send_get("/api/v1/correlate", &[("trace", trace_id.to_string())])?;
274        match resp.status() {
275            StatusCode::NOT_FOUND => Ok(None),
276            s if s.is_success() => {
277                let body = resp
278                    .json::<Value>()
279                    .context("decoding correlate response")?;
280                Ok(Some(
281                    serde_json::from_value(body).context("deserializing CorrelateReport")?,
282                ))
283            }
284            s => bail!("correlate {trace_id} on {}: HTTP {s}", self.base_url),
285        }
286    }
287
288    fn query_sql(&self, sql: &str) -> Result<Vec<Value>> {
289        let body = self.get_json("/api/v1/sql", &[("q", sql.to_string())])?;
290        field(body, "rows")
291    }
292
293    // ── Lifecycle ───────────────────────────────────────────────────
294    fn health(&self) -> Result<()> {
295        let resp = self
296            .send_get("/healthz", &[])?
297            .error_for_status()
298            .with_context(|| format!("health check on {}", self.base_url))?;
299        let _ = resp.text();
300        Ok(())
301    }
302}
303
304/// A [`WalSink`] that ships framed WAL records to a standby `tael-server`'s
305/// `POST /internal/wal/records` endpoint over blocking HTTP — the leader→standby
306/// transport for WAL replication (`docs/tael-server-scaling-ha.md` §5.1).
307/// Blocking, like [`RemoteStore`], because the WAL append path that drives it is
308/// synchronous; `append_framed` returns only once the standby has applied the
309/// record (the per-record ack that makes replication synchronous).
310/// HTTP header carrying the leader's epoch, so the standby can fence out a
311/// deposed leader's records (`cluster::EpochFencer`).
312pub const WAL_EPOCH_HEADER: &str = "x-tael-wal-epoch";
313
314pub struct RemoteWalSink {
315    url: String,
316    http: Client,
317    /// The leader's current epoch, stamped on each shipped record. `None` when
318    /// running without cluster coordination (single leader, no fencing needed).
319    epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
320}
321
322impl RemoteWalSink {
323    /// Target a standby by its REST base URL (e.g. `http://standby-1:7701`).
324    pub fn new(base_url: impl Into<String>) -> Result<Self> {
325        Self::build(base_url, None)
326    }
327
328    /// Like [`Self::new`] but stamps the leader's current epoch (from cluster
329    /// coordination) on each shipped record for standby fencing.
330    pub fn with_epoch(
331        base_url: impl Into<String>,
332        epoch: std::sync::Arc<std::sync::atomic::AtomicU64>,
333    ) -> Result<Self> {
334        Self::build(base_url, Some(epoch))
335    }
336
337    fn build(
338        base_url: impl Into<String>,
339        epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
340    ) -> Result<Self> {
341        let http = Client::builder()
342            .timeout(Duration::from_secs(30))
343            .build()
344            .context("building RemoteWalSink HTTP client")?;
345        let base = base_url.into().trim_end_matches('/').to_string();
346        Ok(Self {
347            url: format!("{base}/internal/wal/records"),
348            http,
349            epoch,
350        })
351    }
352}
353
354impl WalSink for RemoteWalSink {
355    fn append_framed(&self, framed: &[u8]) -> Result<()> {
356        let mut req = self
357            .http
358            .post(&self.url)
359            .header("content-type", "application/octet-stream");
360        if let Some(epoch) = &self.epoch {
361            req = req.header(
362                WAL_EPOCH_HEADER,
363                epoch.load(std::sync::atomic::Ordering::Acquire).to_string(),
364            );
365        }
366        req.body(framed.to_vec())
367            .send()
368            .with_context(|| format!("shipping WAL record to {}", self.url))?
369            .error_for_status()
370            .with_context(|| format!("standby {} rejected WAL record", self.url))?;
371        Ok(())
372    }
373
374    fn name(&self) -> &str {
375        &self.url
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::storage::models::{SpanKind, SpanStatus};
383    use chrono::Utc;
384    use std::collections::HashMap;
385    use std::net::SocketAddr;
386    use std::sync::Arc;
387
388    fn test_span(trace: &str, sid: &str, svc: &str) -> Span {
389        let now = Utc::now();
390        Span {
391            trace_id: trace.into(),
392            span_id: sid.into(),
393            parent_span_id: None,
394            service: svc.into(),
395            operation: "op".into(),
396            start_time: now,
397            end_time: now,
398            duration_ms: 12.0,
399            status: SpanStatus::Ok,
400            attributes: HashMap::new(),
401            events: vec![],
402            kind: SpanKind::Internal,
403            llm: None,
404        }
405    }
406
407    /// Boot a real REST server on an ephemeral port in its own
408    /// runtime thread, seeded with `spans`, and return its address. Running the
409    /// server off-thread lets the blocking `RemoteStore` calls in the test
410    /// thread proceed without nesting/deadlocking a runtime.
411    fn serve_with(spans: Vec<Span>) -> SocketAddr {
412        let dir = tempfile::tempdir().unwrap();
413        let path = dir.path().to_str().unwrap().to_string();
414        let (tx, rx) = std::sync::mpsc::channel();
415        std::thread::spawn(move || {
416            // Keep the tempdir alive for the server thread's lifetime.
417            let _dir = dir;
418            let rt = tokio::runtime::Runtime::new().unwrap();
419            rt.block_on(async move {
420                let store: Arc<dyn Store> = Arc::new(
421                    crate::storage::TaelBackend::with_wal_key(
422                        &path,
423                        &format!("tael-test-remote-{}", uuid::Uuid::new_v4()),
424                    )
425                    .unwrap(),
426                );
427                store.insert_spans(&spans).unwrap();
428                let blobs = Arc::new(crate::storage::BlobStore::new(&path).unwrap());
429                let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
430                let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
431                let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
432                let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
433                tx.send(listener.local_addr().unwrap()).unwrap();
434                axum::serve(listener, app).await.unwrap();
435            });
436        });
437        rx.recv().unwrap()
438    }
439
440    #[test]
441    fn remote_store_roundtrips_reads_over_http() {
442        let addr = serve_with(vec![
443            test_span("t1", "s1", "api"),
444            test_span("t1", "s2", "db"),
445            test_span("t2", "s3", "api"),
446        ]);
447        let remote = RemoteStore::new(format!("http://{addr}")).unwrap();
448
449        remote.health().expect("health");
450
451        let traces = remote.query_traces(&TraceQuery::default()).unwrap();
452        assert_eq!(traces.len(), 3, "all spans returned over HTTP");
453
454        let t1 = remote.get_trace("t1").unwrap();
455        assert_eq!(t1.len(), 2);
456        assert!(t1.iter().all(|s| s.trace_id == "t1"));
457
458        // 404 maps to an empty set, not an error.
459        assert!(remote.get_trace("missing").unwrap().is_empty());
460
461        let services = remote.list_services().unwrap();
462        assert!(services.iter().any(|s| s.name == "api"));
463
464        // Writes are rejected: ingest must go to the owning shard via OTLP.
465        assert!(
466            remote
467                .insert_spans(&[test_span("t3", "s4", "api")])
468                .is_err()
469        );
470    }
471
472    /// Serve an existing store over a fresh REST server thread (the store is
473    /// built by the caller so it can also hold a handle to it).
474    fn serve_store(store: Arc<dyn Store>, data_dir: String) -> SocketAddr {
475        let (tx, rx) = std::sync::mpsc::channel();
476        std::thread::spawn(move || {
477            let rt = tokio::runtime::Runtime::new().unwrap();
478            rt.block_on(async move {
479                let blobs = Arc::new(crate::storage::BlobStore::new(&data_dir).unwrap());
480                let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
481                let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
482                let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
483                let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
484                tx.send(listener.local_addr().unwrap()).unwrap();
485                axum::serve(listener, app).await.unwrap();
486            });
487        });
488        rx.recv().unwrap()
489    }
490
491    /// Removes a walrus namespace dir on drop so test runs don't accrete state.
492    struct WalKeyGuard(String);
493    impl Drop for WalKeyGuard {
494        fn drop(&mut self) {
495            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
496        }
497    }
498
499    #[test]
500    fn wal_replication_ships_leader_writes_to_standby_over_http() {
501        use crate::storage::TaelBackend;
502
503        // A standby tael-backend, served over HTTP so its
504        // /internal/wal/records endpoint is reachable. We keep a handle to
505        // assert its state directly.
506        let standby_dir = tempfile::tempdir().unwrap();
507        let standby_path = standby_dir.path().to_str().unwrap().to_string();
508        let standby_key = format!("tael-test-standby-{}", uuid::Uuid::new_v4());
509        let _sg = WalKeyGuard(standby_key.clone());
510        let standby = Arc::new(TaelBackend::with_wal_key(&standby_path, &standby_key).unwrap());
511        let standby_addr = serve_store(Arc::clone(&standby) as Arc<dyn Store>, standby_path);
512
513        // A leader that ships its WAL to the standby over HTTP, synchronously
514        // (required_acks = None ⇒ all standbys must ack before a write returns).
515        let leader_dir = tempfile::tempdir().unwrap();
516        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
517        let _lg = WalKeyGuard(leader_key.clone());
518        let sink = Arc::new(RemoteWalSink::new(format!("http://{standby_addr}")).unwrap());
519        let leader = TaelBackend::with_wal_key_and_sinks(
520            leader_dir.path().to_str().unwrap(),
521            &leader_key,
522            vec![sink],
523            None,
524        )
525        .unwrap();
526
527        // Write to the leader only.
528        leader
529            .insert_spans(&[
530                test_span("t1", "s1", "api"),
531                test_span("t1", "s2", "db"),
532                test_span("t2", "s3", "api"),
533            ])
534            .unwrap();
535
536        // The standby received and applied the shipped records over HTTP.
537        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
538        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
539
540        // And it serves them over its own REST API (full path works).
541        let via_http = RemoteStore::new(format!("http://{standby_addr}")).unwrap();
542        assert_eq!(
543            via_http.query_traces(&TraceQuery::default()).unwrap().len(),
544            3
545        );
546    }
547}