Skip to main content

haystack_server/ops/
federation.rs

1//! Federation HTTP endpoints — status and sync for remote connectors.
2//!
3//! # Overview
4//!
5//! These admin endpoints expose the state of federation connectors and
6//! allow manual sync triggers.
7//!
8//! # Endpoints
9//!
10//! ## `GET /api/federation/status`
11//!
12//! No request grid. Response columns:
13//!
14//! | Column          | Kind     | Description                         |
15//! |-----------------|----------|-------------------------------------|
16//! | `name`          | Str      | Connector display name              |
17//! | `entityCount`   | Number   | Cached entity count                 |
18//! | `transport`     | Str      | `"http"` or `"ws"`                  |
19//! | `connected`     | Bool     | Whether last sync succeeded         |
20//! | `lastSync`      | DateTime | Timestamp of last successful sync   |
21//! | `cacheVersion`  | Number   | Monotonic cache version counter     |
22//! | `stalenessSecs` | Number   | Seconds since last successful sync  |
23//!
24//! ## `POST /api/federation/sync`
25//!
26//! Triggers all connectors. Response columns:
27//!
28//! | Column   | Kind | Description                     |
29//! |----------|------|---------------------------------|
30//! | `name`   | Str  | Connector name                  |
31//! | `result` | Str  | Entity count or error message   |
32//! | `ok`     | Bool | Whether sync succeeded          |
33//!
34//! ## `POST /api/federation/sync/{name}`
35//!
36//! Same response as sync-all but for a single named connector.
37//!
38//! # Errors
39//!
40//! - **500 Internal Server Error** — encoding failure or sync error.
41
42use actix_web::{HttpRequest, HttpResponse, web};
43
44use haystack_core::data::{HCol, HDict, HGrid};
45use haystack_core::kinds::{HDateTime, Kind, Number};
46
47use crate::connector::TransportMode;
48use crate::content;
49use crate::error::HaystackError;
50use crate::state::AppState;
51
52/// Column definitions for the federation status grid.
53fn status_columns() -> Vec<HCol> {
54    vec![
55        HCol::new("name"),
56        HCol::new("entityCount"),
57        HCol::new("transport"),
58        HCol::new("connected"),
59        HCol::new("lastSync"),
60        HCol::new("cacheVersion"),
61        HCol::new("stalenessSecs"),
62    ]
63}
64
65/// GET /api/federation/status
66///
67/// Returns a grid with one row per connector: `name`, `entityCount`,
68/// `transport`, `connected`, and `lastSync`.
69pub async fn handle_status(
70    req: HttpRequest,
71    state: web::Data<AppState>,
72) -> Result<HttpResponse, HaystackError> {
73    let accept = req
74        .headers()
75        .get("Accept")
76        .and_then(|v| v.to_str().ok())
77        .unwrap_or("");
78
79    let connectors = &state.federation.connectors;
80
81    let grid = if connectors.is_empty() {
82        HGrid::from_parts(HDict::new(), status_columns(), vec![])
83    } else {
84        let rows: Vec<HDict> = connectors
85            .iter()
86            .map(|c| {
87                let st = c.state();
88                let mut row = HDict::new();
89                row.set("name", Kind::Str(c.config.name.clone()));
90                row.set(
91                    "entityCount",
92                    Kind::Number(Number::unitless(st.entity_count as f64)),
93                );
94                let transport_str = match c.transport_mode() {
95                    TransportMode::Http => "http",
96                    TransportMode::WebSocket => "ws",
97                };
98                row.set("transport", Kind::Str(transport_str.to_string()));
99                row.set("connected", Kind::Bool(st.connected));
100                let last_sync_kind = match c.last_sync_time() {
101                    Some(ts) => {
102                        let fixed = ts.fixed_offset();
103                        Kind::DateTime(HDateTime::new(fixed, "UTC"))
104                    }
105                    None => Kind::Null,
106                };
107                row.set("lastSync", last_sync_kind);
108                row.set(
109                    "cacheVersion",
110                    Kind::Number(Number::unitless(st.cache_version as f64)),
111                );
112                match st.staleness_secs {
113                    Some(s) => row.set("stalenessSecs", Kind::Number(Number::unitless(s))),
114                    None => row.set("stalenessSecs", Kind::Null),
115                };
116                row
117            })
118            .collect();
119        HGrid::from_parts(HDict::new(), status_columns(), rows)
120    };
121
122    let (encoded, ct) = content::encode_response_grid(&grid, accept)
123        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
124
125    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
126}
127
128/// POST /api/federation/sync
129///
130/// Triggers `sync_all()` on all connectors. Returns a grid with `name`,
131/// `result` (entity count or error string), and `ok` (Bool).
132pub async fn handle_sync(
133    req: HttpRequest,
134    state: web::Data<AppState>,
135) -> Result<HttpResponse, HaystackError> {
136    let accept = req
137        .headers()
138        .get("Accept")
139        .and_then(|v| v.to_str().ok())
140        .unwrap_or("");
141
142    let results = state.federation.sync_all().await;
143
144    let rows: Vec<HDict> = results
145        .into_iter()
146        .map(|(name, result)| {
147            let mut row = HDict::new();
148            row.set("name", Kind::Str(name));
149            match result {
150                Ok(count) => {
151                    row.set("result", Kind::Str(format!("{count} entities")));
152                    row.set("ok", Kind::Bool(true));
153                }
154                Err(err) => {
155                    row.set("result", Kind::Str(err));
156                    row.set("ok", Kind::Bool(false));
157                }
158            }
159            row
160        })
161        .collect();
162
163    let grid = HGrid::from_parts(
164        HDict::new(),
165        vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
166        rows,
167    );
168
169    let (encoded, ct) = content::encode_response_grid(&grid, accept)
170        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
171
172    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
173}
174
175/// POST /api/federation/sync/{name}
176///
177/// Triggers `sync_one(name)` on a single connector. Returns a grid with
178/// `name`, `result` (entity count or error string), and `ok` (Bool).
179pub async fn handle_sync_one(
180    req: HttpRequest,
181    state: web::Data<AppState>,
182    path: web::Path<String>,
183) -> Result<HttpResponse, HaystackError> {
184    let accept = req
185        .headers()
186        .get("Accept")
187        .and_then(|v| v.to_str().ok())
188        .unwrap_or("");
189
190    let name = path.into_inner();
191    let result = state.federation.sync_one(&name).await;
192
193    let mut row = HDict::new();
194    row.set("name", Kind::Str(name));
195    match result {
196        Ok(count) => {
197            row.set("result", Kind::Str(format!("{count} entities")));
198            row.set("ok", Kind::Bool(true));
199        }
200        Err(err) => {
201            row.set("result", Kind::Str(err));
202            row.set("ok", Kind::Bool(false));
203        }
204    }
205
206    let grid = HGrid::from_parts(
207        HDict::new(),
208        vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
209        vec![row],
210    );
211
212    let (encoded, ct) = content::encode_response_grid(&grid, accept)
213        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
214
215    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
216}