Skip to main content

haystack_server/ops/
federation.rs

1//! Federation HTTP endpoints — status and sync for remote connectors.
2
3use actix_web::{HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::{HDateTime, Kind, Number};
7
8use crate::connector::TransportMode;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13/// Column definitions for the federation status grid.
14fn status_columns() -> Vec<HCol> {
15    vec![
16        HCol::new("name"),
17        HCol::new("entityCount"),
18        HCol::new("transport"),
19        HCol::new("connected"),
20        HCol::new("lastSync"),
21    ]
22}
23
24/// GET /api/federation/status
25///
26/// Returns a grid with one row per connector: `name`, `entityCount`,
27/// `transport`, `connected`, and `lastSync`.
28pub async fn handle_status(
29    req: HttpRequest,
30    state: web::Data<AppState>,
31) -> Result<HttpResponse, HaystackError> {
32    let accept = req
33        .headers()
34        .get("Accept")
35        .and_then(|v| v.to_str().ok())
36        .unwrap_or("");
37
38    let connectors = &state.federation.connectors;
39
40    let grid = if connectors.is_empty() {
41        HGrid::from_parts(HDict::new(), status_columns(), vec![])
42    } else {
43        let rows: Vec<HDict> = connectors
44            .iter()
45            .map(|c| {
46                let mut row = HDict::new();
47                row.set("name", Kind::Str(c.config.name.clone()));
48                row.set(
49                    "entityCount",
50                    Kind::Number(Number::unitless(c.entity_count() as f64)),
51                );
52                let transport_str = match c.transport_mode() {
53                    TransportMode::Http => "http",
54                    TransportMode::WebSocket => "ws",
55                };
56                row.set("transport", Kind::Str(transport_str.to_string()));
57                row.set("connected", Kind::Bool(c.is_connected()));
58                let last_sync_kind = match c.last_sync_time() {
59                    Some(ts) => {
60                        let fixed = ts.fixed_offset();
61                        Kind::DateTime(HDateTime::new(fixed, "UTC"))
62                    }
63                    None => Kind::Null,
64                };
65                row.set("lastSync", last_sync_kind);
66                row
67            })
68            .collect();
69        HGrid::from_parts(HDict::new(), status_columns(), rows)
70    };
71
72    let (encoded, ct) = content::encode_response_grid(&grid, accept)
73        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
74
75    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
76}
77
78/// POST /api/federation/sync
79///
80/// Triggers `sync_all()` on all connectors. Returns a grid with `name`,
81/// `result` (entity count or error string), and `ok` (Bool).
82pub async fn handle_sync(
83    req: HttpRequest,
84    state: web::Data<AppState>,
85) -> Result<HttpResponse, HaystackError> {
86    let accept = req
87        .headers()
88        .get("Accept")
89        .and_then(|v| v.to_str().ok())
90        .unwrap_or("");
91
92    let results = state.federation.sync_all().await;
93
94    let rows: Vec<HDict> = results
95        .into_iter()
96        .map(|(name, result)| {
97            let mut row = HDict::new();
98            row.set("name", Kind::Str(name));
99            match result {
100                Ok(count) => {
101                    row.set("result", Kind::Str(format!("{count} entities")));
102                    row.set("ok", Kind::Bool(true));
103                }
104                Err(err) => {
105                    row.set("result", Kind::Str(err));
106                    row.set("ok", Kind::Bool(false));
107                }
108            }
109            row
110        })
111        .collect();
112
113    let grid = HGrid::from_parts(
114        HDict::new(),
115        vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
116        rows,
117    );
118
119    let (encoded, ct) = content::encode_response_grid(&grid, accept)
120        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
121
122    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
123}
124
125/// POST /api/federation/sync/{name}
126///
127/// Triggers `sync_one(name)` on a single connector. Returns a grid with
128/// `name`, `result` (entity count or error string), and `ok` (Bool).
129pub async fn handle_sync_one(
130    req: HttpRequest,
131    state: web::Data<AppState>,
132    path: web::Path<String>,
133) -> Result<HttpResponse, HaystackError> {
134    let accept = req
135        .headers()
136        .get("Accept")
137        .and_then(|v| v.to_str().ok())
138        .unwrap_or("");
139
140    let name = path.into_inner();
141    let result = state.federation.sync_one(&name).await;
142
143    let mut row = HDict::new();
144    row.set("name", Kind::Str(name));
145    match result {
146        Ok(count) => {
147            row.set("result", Kind::Str(format!("{count} entities")));
148            row.set("ok", Kind::Bool(true));
149        }
150        Err(err) => {
151            row.set("result", Kind::Str(err));
152            row.set("ok", Kind::Bool(false));
153        }
154    }
155
156    let grid = HGrid::from_parts(
157        HDict::new(),
158        vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
159        vec![row],
160    );
161
162    let (encoded, ct) = content::encode_response_grid(&grid, accept)
163        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
164
165    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
166}