haystack_server/ops/
federation.rs1use actix_web::{HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::{HDateTime, Kind, Number};
7
8use crate::connector::TransportMode;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13fn status_columns() -> Vec<HCol> {
15 vec![
16 HCol::new("name"),
17 HCol::new("entityCount"),
18 HCol::new("transport"),
19 HCol::new("connected"),
20 HCol::new("lastSync"),
21 ]
22}
23
24pub async fn handle_status(
29 req: HttpRequest,
30 state: web::Data<AppState>,
31) -> Result<HttpResponse, HaystackError> {
32 let accept = req
33 .headers()
34 .get("Accept")
35 .and_then(|v| v.to_str().ok())
36 .unwrap_or("");
37
38 let connectors = &state.federation.connectors;
39
40 let grid = if connectors.is_empty() {
41 HGrid::from_parts(HDict::new(), status_columns(), vec![])
42 } else {
43 let rows: Vec<HDict> = connectors
44 .iter()
45 .map(|c| {
46 let mut row = HDict::new();
47 row.set("name", Kind::Str(c.config.name.clone()));
48 row.set(
49 "entityCount",
50 Kind::Number(Number::unitless(c.entity_count() as f64)),
51 );
52 let transport_str = match c.transport_mode() {
53 TransportMode::Http => "http",
54 TransportMode::WebSocket => "ws",
55 };
56 row.set("transport", Kind::Str(transport_str.to_string()));
57 row.set("connected", Kind::Bool(c.is_connected()));
58 let last_sync_kind = match c.last_sync_time() {
59 Some(ts) => {
60 let fixed = ts.fixed_offset();
61 Kind::DateTime(HDateTime::new(fixed, "UTC"))
62 }
63 None => Kind::Null,
64 };
65 row.set("lastSync", last_sync_kind);
66 row
67 })
68 .collect();
69 HGrid::from_parts(HDict::new(), status_columns(), rows)
70 };
71
72 let (encoded, ct) = content::encode_response_grid(&grid, accept)
73 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
74
75 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
76}
77
78pub async fn handle_sync(
83 req: HttpRequest,
84 state: web::Data<AppState>,
85) -> Result<HttpResponse, HaystackError> {
86 let accept = req
87 .headers()
88 .get("Accept")
89 .and_then(|v| v.to_str().ok())
90 .unwrap_or("");
91
92 let results = state.federation.sync_all().await;
93
94 let rows: Vec<HDict> = results
95 .into_iter()
96 .map(|(name, result)| {
97 let mut row = HDict::new();
98 row.set("name", Kind::Str(name));
99 match result {
100 Ok(count) => {
101 row.set("result", Kind::Str(format!("{count} entities")));
102 row.set("ok", Kind::Bool(true));
103 }
104 Err(err) => {
105 row.set("result", Kind::Str(err));
106 row.set("ok", Kind::Bool(false));
107 }
108 }
109 row
110 })
111 .collect();
112
113 let grid = HGrid::from_parts(
114 HDict::new(),
115 vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
116 rows,
117 );
118
119 let (encoded, ct) = content::encode_response_grid(&grid, accept)
120 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
121
122 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
123}
124
125pub async fn handle_sync_one(
130 req: HttpRequest,
131 state: web::Data<AppState>,
132 path: web::Path<String>,
133) -> Result<HttpResponse, HaystackError> {
134 let accept = req
135 .headers()
136 .get("Accept")
137 .and_then(|v| v.to_str().ok())
138 .unwrap_or("");
139
140 let name = path.into_inner();
141 let result = state.federation.sync_one(&name).await;
142
143 let mut row = HDict::new();
144 row.set("name", Kind::Str(name));
145 match result {
146 Ok(count) => {
147 row.set("result", Kind::Str(format!("{count} entities")));
148 row.set("ok", Kind::Bool(true));
149 }
150 Err(err) => {
151 row.set("result", Kind::Str(err));
152 row.set("ok", Kind::Bool(false));
153 }
154 }
155
156 let grid = HGrid::from_parts(
157 HDict::new(),
158 vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
159 vec![row],
160 );
161
162 let (encoded, ct) = content::encode_response_grid(&grid, accept)
163 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
164
165 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
166}