1use anyhow::{Context, Result, anyhow, bail};
24use reqwest::StatusCode;
25use reqwest::blocking::Client;
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use std::time::Duration;
29
30use super::Store;
31use super::backend::WalSink;
32use super::models::{
33 AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
34 Span, SummaryReport, TraceComment, TraceQuery,
35};
36
37pub struct RemoteStore {
39 base_url: String,
40 http: Client,
41}
42
43impl RemoteStore {
44 pub fn new(base_url: impl Into<String>) -> Result<Self> {
46 Self::with_timeout(base_url, Duration::from_secs(30))
47 }
48
49 pub fn with_timeout(base_url: impl Into<String>, timeout: Duration) -> Result<Self> {
51 let http = Client::builder()
52 .timeout(timeout)
53 .build()
54 .context("building RemoteStore HTTP client")?;
55 Ok(Self {
56 base_url: base_url.into().trim_end_matches('/').to_string(),
57 http,
58 })
59 }
60
61 fn send_get(
62 &self,
63 path: &str,
64 params: &[(&str, String)],
65 ) -> Result<reqwest::blocking::Response> {
66 self.http
67 .get(format!("{}{path}", self.base_url))
68 .query(params)
69 .send()
70 .with_context(|| format!("GET {path} from {}", self.base_url))
71 }
72
73 fn get_json(&self, path: &str, params: &[(&str, String)]) -> Result<Value> {
75 let resp = self
76 .send_get(path, params)?
77 .error_for_status()
78 .with_context(|| format!("GET {path} from {}", self.base_url))?;
79 resp.json::<Value>()
80 .with_context(|| format!("decoding {path} response from {}", self.base_url))
81 }
82}
83
84fn field<T: DeserializeOwned>(mut value: Value, key: &str) -> Result<T> {
87 let v = value
88 .get_mut(key)
89 .map(Value::take)
90 .ok_or_else(|| anyhow!("response missing `{key}` field"))?;
91 serde_json::from_value(v).with_context(|| format!("deserializing `{key}`"))
92}
93
94fn last_param(params: &mut Vec<(&'static str, String)>, last_seconds: Option<i64>) {
97 if let Some(s) = last_seconds {
98 params.push(("last", s.to_string()));
99 }
100}
101
102impl Store for RemoteStore {
103 fn insert_spans(&self, _spans: &[Span]) -> Result<()> {
105 bail!("RemoteStore is read-only: route span ingest to the owning shard via OTLP");
106 }
107
108 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
109 let mut params: Vec<(&str, String)> = Vec::new();
110 if let Some(ref s) = query.service {
111 params.push(("service", s.clone()));
112 }
113 if let Some(ref o) = query.operation {
114 params.push(("operation", o.clone()));
115 }
116 if let Some(d) = query.min_duration_ms {
117 params.push(("min_duration_ms", d.to_string()));
118 }
119 if let Some(d) = query.max_duration_ms {
120 params.push(("max_duration_ms", d.to_string()));
121 }
122 if let Some(ref s) = query.status {
123 params.push(("status", s.clone()));
124 }
125 last_param(&mut params, query.last_seconds);
126 if let Some(l) = query.limit {
127 params.push(("limit", l.to_string()));
128 }
129 for (k, v) in &query.attributes {
130 params.push(("attribute", format!("{k}={v}")));
131 }
132 if let Some(ref t) = query.text {
133 params.push(("text", t.clone()));
134 }
135 let body = self.get_json("/api/v1/traces", ¶ms)?;
136 field(body, "spans")
137 }
138
139 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
140 let resp = self.send_get(&format!("/api/v1/traces/{trace_id}"), &[])?;
144 match resp.status() {
145 StatusCode::NOT_FOUND => Ok(Vec::new()),
146 s if s.is_success() => {
147 let body = resp
148 .json::<Value>()
149 .context("decoding get_trace response")?;
150 field(body, "spans")
151 }
152 s => bail!("get_trace {trace_id} on {}: HTTP {s}", self.base_url),
153 }
154 }
155
156 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
157 let body = self.get_json("/api/v1/services", &[])?;
158 field(body, "services")
159 }
160
161 fn add_comment(
163 &self,
164 trace_id: &str,
165 span_id: Option<&str>,
166 author: &str,
167 body: &str,
168 ) -> Result<TraceComment> {
169 let mut payload = serde_json::json!({ "author": author, "body": body });
170 if let Some(s) = span_id {
171 payload["span_id"] = serde_json::json!(s);
172 }
173 let resp = self
174 .http
175 .post(format!(
176 "{}/api/v1/traces/{trace_id}/comments",
177 self.base_url
178 ))
179 .json(&payload)
180 .send()
181 .with_context(|| format!("POST comment to {}", self.base_url))?
182 .error_for_status()
183 .with_context(|| format!("POST comment to {}", self.base_url))?
184 .json::<Value>()
185 .context("decoding add_comment response")?;
186 field(resp, "comment")
187 }
188
189 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
190 let body = self.get_json(&format!("/api/v1/traces/{trace_id}/comments"), &[])?;
191 field(body, "comments")
192 }
193
194 fn insert_logs(&self, _logs: &[LogRecord]) -> Result<()> {
196 bail!("RemoteStore is read-only: route log ingest to the owning shard via OTLP");
197 }
198
199 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
200 let mut params: Vec<(&str, String)> = Vec::new();
201 if let Some(ref s) = query.service {
202 params.push(("service", s.clone()));
203 }
204 if let Some(ref s) = query.severity {
205 params.push(("severity", s.clone()));
206 }
207 if let Some(ref b) = query.body_contains {
208 params.push(("body_contains", b.clone()));
209 }
210 if let Some(ref t) = query.trace_id {
211 params.push(("trace_id", t.clone()));
212 }
213 last_param(&mut params, query.last_seconds);
214 if let Some(l) = query.limit {
215 params.push(("limit", l.to_string()));
216 }
217 let body = self.get_json("/api/v1/logs", ¶ms)?;
218 field(body, "logs")
219 }
220
221 fn insert_metrics(&self, _metrics: &[MetricPoint]) -> Result<()> {
223 bail!("RemoteStore is read-only: route metric ingest to the owning shard via OTLP");
224 }
225
226 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
227 let mut params: Vec<(&str, String)> = Vec::new();
228 if let Some(ref s) = query.service {
229 params.push(("service", s.clone()));
230 }
231 if let Some(ref n) = query.name {
232 params.push(("name", n.clone()));
233 }
234 if let Some(ref t) = query.metric_type {
235 params.push(("metric_type", t.clone()));
236 }
237 last_param(&mut params, query.last_seconds);
238 if let Some(l) = query.limit {
239 params.push(("limit", l.to_string()));
240 }
241 let body = self.get_json("/api/v1/metrics", ¶ms)?;
242 field(body, "metrics")
243 }
244
245 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
247 let mut params: Vec<(&str, String)> = vec![("last", last_seconds.to_string())];
248 if let Some(s) = service {
249 params.push(("service", s.to_string()));
250 }
251 let body = self.get_json("/api/v1/summary", ¶ms)?;
252 serde_json::from_value(body).context("deserializing SummaryReport")
253 }
254
255 fn query_anomalies(
256 &self,
257 current_seconds: i64,
258 baseline_seconds: i64,
259 service: Option<&str>,
260 ) -> Result<AnomalyReport> {
261 let mut params: Vec<(&str, String)> = vec![
262 ("last", current_seconds.to_string()),
263 ("baseline", baseline_seconds.to_string()),
264 ];
265 if let Some(s) = service {
266 params.push(("service", s.to_string()));
267 }
268 let body = self.get_json("/api/v1/anomalies", ¶ms)?;
269 serde_json::from_value(body).context("deserializing AnomalyReport")
270 }
271
272 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
273 let resp = self.send_get("/api/v1/correlate", &[("trace", trace_id.to_string())])?;
274 match resp.status() {
275 StatusCode::NOT_FOUND => Ok(None),
276 s if s.is_success() => {
277 let body = resp
278 .json::<Value>()
279 .context("decoding correlate response")?;
280 Ok(Some(
281 serde_json::from_value(body).context("deserializing CorrelateReport")?,
282 ))
283 }
284 s => bail!("correlate {trace_id} on {}: HTTP {s}", self.base_url),
285 }
286 }
287
288 fn query_sql(&self, sql: &str) -> Result<Vec<Value>> {
289 let body = self.get_json("/api/v1/sql", &[("q", sql.to_string())])?;
290 field(body, "rows")
291 }
292
293 fn health(&self) -> Result<()> {
295 let resp = self
296 .send_get("/healthz", &[])?
297 .error_for_status()
298 .with_context(|| format!("health check on {}", self.base_url))?;
299 let _ = resp.text();
300 Ok(())
301 }
302}
303
304pub const WAL_EPOCH_HEADER: &str = "x-tael-wal-epoch";
313
314pub struct RemoteWalSink {
315 url: String,
316 http: Client,
317 epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
320}
321
322impl RemoteWalSink {
323 pub fn new(base_url: impl Into<String>) -> Result<Self> {
325 Self::build(base_url, None)
326 }
327
328 pub fn with_epoch(
331 base_url: impl Into<String>,
332 epoch: std::sync::Arc<std::sync::atomic::AtomicU64>,
333 ) -> Result<Self> {
334 Self::build(base_url, Some(epoch))
335 }
336
337 fn build(
338 base_url: impl Into<String>,
339 epoch: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
340 ) -> Result<Self> {
341 let http = Client::builder()
342 .timeout(Duration::from_secs(30))
343 .build()
344 .context("building RemoteWalSink HTTP client")?;
345 let base = base_url.into().trim_end_matches('/').to_string();
346 Ok(Self {
347 url: format!("{base}/internal/wal/records"),
348 http,
349 epoch,
350 })
351 }
352}
353
354impl WalSink for RemoteWalSink {
355 fn append_framed(&self, framed: &[u8]) -> Result<()> {
356 let mut req = self
357 .http
358 .post(&self.url)
359 .header("content-type", "application/octet-stream");
360 if let Some(epoch) = &self.epoch {
361 req = req.header(
362 WAL_EPOCH_HEADER,
363 epoch.load(std::sync::atomic::Ordering::Acquire).to_string(),
364 );
365 }
366 req.body(framed.to_vec())
367 .send()
368 .with_context(|| format!("shipping WAL record to {}", self.url))?
369 .error_for_status()
370 .with_context(|| format!("standby {} rejected WAL record", self.url))?;
371 Ok(())
372 }
373
374 fn name(&self) -> &str {
375 &self.url
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use crate::storage::models::{SpanKind, SpanStatus};
383 use chrono::Utc;
384 use std::collections::HashMap;
385 use std::net::SocketAddr;
386 use std::sync::Arc;
387
388 fn test_span(trace: &str, sid: &str, svc: &str) -> Span {
389 let now = Utc::now();
390 Span {
391 trace_id: trace.into(),
392 span_id: sid.into(),
393 parent_span_id: None,
394 service: svc.into(),
395 operation: "op".into(),
396 start_time: now,
397 end_time: now,
398 duration_ms: 12.0,
399 status: SpanStatus::Ok,
400 attributes: HashMap::new(),
401 events: vec![],
402 kind: SpanKind::Internal,
403 llm: None,
404 }
405 }
406
407 fn serve_with(spans: Vec<Span>) -> SocketAddr {
412 let dir = tempfile::tempdir().unwrap();
413 let path = dir.path().to_str().unwrap().to_string();
414 let (tx, rx) = std::sync::mpsc::channel();
415 std::thread::spawn(move || {
416 let _dir = dir;
418 let rt = tokio::runtime::Runtime::new().unwrap();
419 rt.block_on(async move {
420 let store: Arc<dyn Store> = Arc::new(
421 crate::storage::TaelBackend::with_wal_key(
422 &path,
423 &format!("tael-test-remote-{}", uuid::Uuid::new_v4()),
424 )
425 .unwrap(),
426 );
427 store.insert_spans(&spans).unwrap();
428 let blobs = Arc::new(crate::storage::BlobStore::new(&path).unwrap());
429 let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
430 let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
431 let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
432 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
433 tx.send(listener.local_addr().unwrap()).unwrap();
434 axum::serve(listener, app).await.unwrap();
435 });
436 });
437 rx.recv().unwrap()
438 }
439
440 #[test]
441 fn remote_store_roundtrips_reads_over_http() {
442 let addr = serve_with(vec![
443 test_span("t1", "s1", "api"),
444 test_span("t1", "s2", "db"),
445 test_span("t2", "s3", "api"),
446 ]);
447 let remote = RemoteStore::new(format!("http://{addr}")).unwrap();
448
449 remote.health().expect("health");
450
451 let traces = remote.query_traces(&TraceQuery::default()).unwrap();
452 assert_eq!(traces.len(), 3, "all spans returned over HTTP");
453
454 let t1 = remote.get_trace("t1").unwrap();
455 assert_eq!(t1.len(), 2);
456 assert!(t1.iter().all(|s| s.trace_id == "t1"));
457
458 assert!(remote.get_trace("missing").unwrap().is_empty());
460
461 let services = remote.list_services().unwrap();
462 assert!(services.iter().any(|s| s.name == "api"));
463
464 assert!(
466 remote
467 .insert_spans(&[test_span("t3", "s4", "api")])
468 .is_err()
469 );
470 }
471
472 fn serve_store(store: Arc<dyn Store>, data_dir: String) -> SocketAddr {
475 let (tx, rx) = std::sync::mpsc::channel();
476 std::thread::spawn(move || {
477 let rt = tokio::runtime::Runtime::new().unwrap();
478 rt.block_on(async move {
479 let blobs = Arc::new(crate::storage::BlobStore::new(&data_dir).unwrap());
480 let bus = Arc::new(crate::span_bus::SpanBus::new().unwrap());
481 let log_bus = Arc::new(crate::log_bus::LogBus::new().unwrap());
482 let app = crate::api::rest::router(store, blobs, bus, log_bus, None);
483 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
484 tx.send(listener.local_addr().unwrap()).unwrap();
485 axum::serve(listener, app).await.unwrap();
486 });
487 });
488 rx.recv().unwrap()
489 }
490
491 struct WalKeyGuard(String);
493 impl Drop for WalKeyGuard {
494 fn drop(&mut self) {
495 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
496 }
497 }
498
499 #[test]
500 fn wal_replication_ships_leader_writes_to_standby_over_http() {
501 use crate::storage::TaelBackend;
502
503 let standby_dir = tempfile::tempdir().unwrap();
507 let standby_path = standby_dir.path().to_str().unwrap().to_string();
508 let standby_key = format!("tael-test-standby-{}", uuid::Uuid::new_v4());
509 let _sg = WalKeyGuard(standby_key.clone());
510 let standby = Arc::new(TaelBackend::with_wal_key(&standby_path, &standby_key).unwrap());
511 let standby_addr = serve_store(Arc::clone(&standby) as Arc<dyn Store>, standby_path);
512
513 let leader_dir = tempfile::tempdir().unwrap();
516 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
517 let _lg = WalKeyGuard(leader_key.clone());
518 let sink = Arc::new(RemoteWalSink::new(format!("http://{standby_addr}")).unwrap());
519 let leader = TaelBackend::with_wal_key_and_sinks(
520 leader_dir.path().to_str().unwrap(),
521 &leader_key,
522 vec![sink],
523 None,
524 )
525 .unwrap();
526
527 leader
529 .insert_spans(&[
530 test_span("t1", "s1", "api"),
531 test_span("t1", "s2", "db"),
532 test_span("t2", "s3", "api"),
533 ])
534 .unwrap();
535
536 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
538 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
539
540 let via_http = RemoteStore::new(format!("http://{standby_addr}")).unwrap();
542 assert_eq!(
543 via_http.query_traces(&TraceQuery::default()).unwrap().len(),
544 3
545 );
546 }
547}