haystack_server/ops/
federation.rs1use actix_web::{HttpRequest, HttpResponse, web};
41
42use haystack_core::data::{HCol, HDict, HGrid};
43use haystack_core::kinds::{HDateTime, Kind, Number};
44
45use crate::connector::TransportMode;
46use crate::content;
47use crate::error::HaystackError;
48use crate::state::AppState;
49
50fn status_columns() -> Vec<HCol> {
52 vec![
53 HCol::new("name"),
54 HCol::new("entityCount"),
55 HCol::new("transport"),
56 HCol::new("connected"),
57 HCol::new("lastSync"),
58 ]
59}
60
61pub async fn handle_status(
66 req: HttpRequest,
67 state: web::Data<AppState>,
68) -> Result<HttpResponse, HaystackError> {
69 let accept = req
70 .headers()
71 .get("Accept")
72 .and_then(|v| v.to_str().ok())
73 .unwrap_or("");
74
75 let connectors = &state.federation.connectors;
76
77 let grid = if connectors.is_empty() {
78 HGrid::from_parts(HDict::new(), status_columns(), vec![])
79 } else {
80 let rows: Vec<HDict> = connectors
81 .iter()
82 .map(|c| {
83 let mut row = HDict::new();
84 row.set("name", Kind::Str(c.config.name.clone()));
85 row.set(
86 "entityCount",
87 Kind::Number(Number::unitless(c.entity_count() as f64)),
88 );
89 let transport_str = match c.transport_mode() {
90 TransportMode::Http => "http",
91 TransportMode::WebSocket => "ws",
92 };
93 row.set("transport", Kind::Str(transport_str.to_string()));
94 row.set("connected", Kind::Bool(c.is_connected()));
95 let last_sync_kind = match c.last_sync_time() {
96 Some(ts) => {
97 let fixed = ts.fixed_offset();
98 Kind::DateTime(HDateTime::new(fixed, "UTC"))
99 }
100 None => Kind::Null,
101 };
102 row.set("lastSync", last_sync_kind);
103 row
104 })
105 .collect();
106 HGrid::from_parts(HDict::new(), status_columns(), rows)
107 };
108
109 let (encoded, ct) = content::encode_response_grid(&grid, accept)
110 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
111
112 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
113}
114
115pub async fn handle_sync(
120 req: HttpRequest,
121 state: web::Data<AppState>,
122) -> Result<HttpResponse, HaystackError> {
123 let accept = req
124 .headers()
125 .get("Accept")
126 .and_then(|v| v.to_str().ok())
127 .unwrap_or("");
128
129 let results = state.federation.sync_all().await;
130
131 let rows: Vec<HDict> = results
132 .into_iter()
133 .map(|(name, result)| {
134 let mut row = HDict::new();
135 row.set("name", Kind::Str(name));
136 match result {
137 Ok(count) => {
138 row.set("result", Kind::Str(format!("{count} entities")));
139 row.set("ok", Kind::Bool(true));
140 }
141 Err(err) => {
142 row.set("result", Kind::Str(err));
143 row.set("ok", Kind::Bool(false));
144 }
145 }
146 row
147 })
148 .collect();
149
150 let grid = HGrid::from_parts(
151 HDict::new(),
152 vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
153 rows,
154 );
155
156 let (encoded, ct) = content::encode_response_grid(&grid, accept)
157 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
158
159 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
160}
161
162pub async fn handle_sync_one(
167 req: HttpRequest,
168 state: web::Data<AppState>,
169 path: web::Path<String>,
170) -> Result<HttpResponse, HaystackError> {
171 let accept = req
172 .headers()
173 .get("Accept")
174 .and_then(|v| v.to_str().ok())
175 .unwrap_or("");
176
177 let name = path.into_inner();
178 let result = state.federation.sync_one(&name).await;
179
180 let mut row = HDict::new();
181 row.set("name", Kind::Str(name));
182 match result {
183 Ok(count) => {
184 row.set("result", Kind::Str(format!("{count} entities")));
185 row.set("ok", Kind::Bool(true));
186 }
187 Err(err) => {
188 row.set("result", Kind::Str(err));
189 row.set("ok", Kind::Bool(false));
190 }
191 }
192
193 let grid = HGrid::from_parts(
194 HDict::new(),
195 vec![HCol::new("name"), HCol::new("result"), HCol::new("ok")],
196 vec![row],
197 );
198
199 let (encoded, ct) = content::encode_response_grid(&grid, accept)
200 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
201
202 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
203}