1use crate::core::HsiSnapshot;
19use crate::gateway::GatewayConfig;
20use crate::gateway::{BehavioralSession as GatewayBehavioralSession, SessionMeta, SessionPayload};
21use axum::{
22 extract::State,
23 http::{HeaderValue, StatusCode},
24 routing::{get, post},
25 Json, Router,
26};
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::path::PathBuf;
30use std::sync::Arc;
31use synheart_flux::BehaviorProcessor;
32use tokio::net::TcpListener;
33use tokio::sync::RwLock;
34use tower_http::cors::{Any, CorsLayer};
35
36#[derive(Debug, Clone)]
38pub struct ServerConfig {
39 pub port: u16,
41 pub gateway_config: GatewayConfig,
43 pub state_dir: PathBuf,
45}
46
47impl ServerConfig {
48 pub fn new(port: u16, gateway_config: GatewayConfig, state_dir: PathBuf) -> Self {
50 Self {
51 port,
52 gateway_config,
53 state_dir,
54 }
55 }
56}
57
58pub struct ServerState {
60 processor: RwLock<BehaviorProcessor>,
62 gateway_config: GatewayConfig,
64 http_client: reqwest::Client,
66 state_dir: PathBuf,
68}
69
70impl ServerState {
71 pub fn new(config: &ServerConfig) -> Self {
73 let mut processor = BehaviorProcessor::new();
74
75 let baseline_path = config
77 .state_dir
78 .join("state")
79 .join("behavior_baselines.json");
80 if baseline_path.exists() {
81 if let Ok(json) = std::fs::read_to_string(&baseline_path) {
82 if let Err(e) = processor.load_baselines(&json) {
83 tracing::warn!("Failed to load baselines: {}", e);
84 }
85 }
86 }
87
88 Self {
89 processor: RwLock::new(processor),
90 gateway_config: config.gateway_config.clone(),
91 http_client: reqwest::Client::builder()
92 .timeout(std::time::Duration::from_secs(10))
93 .build()
94 .expect("Failed to create HTTP client"),
95 state_dir: config.state_dir.clone(),
96 }
97 }
98
99 async fn save_baselines(&self) {
101 let baseline_dir = self.state_dir.join("state");
102 let _ = std::fs::create_dir_all(&baseline_dir);
103
104 let processor = self.processor.read().await;
105 if let Ok(json) = processor.save_baselines() {
106 let path = baseline_dir.join("behavior_baselines.json");
107 if let Err(e) = std::fs::write(&path, json) {
108 tracing::warn!("Failed to save baselines: {}", e);
109 }
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BehavioralSession {
117 pub session: serde_json::Value,
119}
120
121#[derive(Debug, Clone, Serialize)]
123pub struct IngestResponse {
124 pub status: String,
126 pub message: String,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub hsi_payload: Option<serde_json::Value>,
131}
132
133#[derive(Serialize)]
135pub struct HealthResponse {
136 pub status: String,
138 pub version: String,
140}
141
142#[derive(Serialize)]
144pub struct ErrorResponse {
145 pub error: String,
147 pub code: String,
149}
150
151async fn health() -> Json<HealthResponse> {
153 Json(HealthResponse {
154 status: "ok".to_string(),
155 version: env!("CARGO_PKG_VERSION").to_string(),
156 })
157}
158
159async fn ingest(
164 State(state): State<Arc<ServerState>>,
165 Json(data): Json<BehavioralSession>,
166) -> Result<Json<IngestResponse>, (StatusCode, Json<ErrorResponse>)> {
167 let session_json = serde_json::to_string(&data.session).map_err(|e| {
169 (
170 StatusCode::BAD_REQUEST,
171 Json(ErrorResponse {
172 error: format!("Invalid session data: {e}"),
173 code: "INVALID_SESSION".to_string(),
174 }),
175 )
176 })?;
177
178 let hsi_json = {
180 let mut processor = state.processor.write().await;
181 processor.process(&session_json).map_err(|e| {
182 (
183 StatusCode::INTERNAL_SERVER_ERROR,
184 Json(ErrorResponse {
185 error: format!("Flux processing failed: {e}"),
186 code: "FLUX_ERROR".to_string(),
187 }),
188 )
189 })?
190 };
191
192 let hsi_snapshot: HsiSnapshot = serde_json::from_str(&hsi_json).map_err(|e| {
194 (
195 StatusCode::INTERNAL_SERVER_ERROR,
196 Json(ErrorResponse {
197 error: format!("Failed to parse HSI output: {e}"),
198 code: "PARSE_ERROR".to_string(),
199 }),
200 )
201 })?;
202
203 let session_obj = data.session.as_object();
206 let get_str = |key: &str| -> Option<String> {
207 session_obj
208 .and_then(|m| m.get(key))
209 .and_then(|v| v.as_str())
210 .map(|s| s.to_string())
211 };
212
213 let session_id = get_str("session_id").unwrap_or_else(|| "unknown-session".to_string());
214 let device_id = get_str("device_id").unwrap_or_else(|| "unknown-device".to_string());
215 let timezone = get_str("timezone").unwrap_or_else(|| "UTC".to_string());
216 let start_time = get_str("start_time").unwrap_or_else(|| hsi_snapshot.observed_at_utc.clone());
217 let end_time = get_str("end_time").unwrap_or_else(|| hsi_snapshot.computed_at_utc.clone());
218
219 let gateway_url = state.gateway_config.ingest_url();
221 let gateway_payload = GatewayBehavioralSession {
222 session: SessionPayload {
223 session_id,
224 device_id,
225 timezone,
226 start_time,
227 end_time,
228 snapshots: vec![hsi_snapshot.clone()],
229 meta: SessionMeta {
230 source: "synheart-sensor-agent-server".to_string(),
231 version: env!("CARGO_PKG_VERSION").to_string(),
232 snapshot_count: 1,
233 },
234 },
235 };
236
237 let response = state
238 .http_client
239 .post(&gateway_url)
240 .header(
241 "Authorization",
242 format!("Bearer {}", state.gateway_config.token),
243 )
244 .header("Content-Type", "application/json")
245 .json(&gateway_payload)
246 .send()
247 .await
248 .map_err(|e| {
249 tracing::error!("Failed to forward to gateway: {}", e);
250 (
251 StatusCode::BAD_GATEWAY,
252 Json(ErrorResponse {
253 error: format!("Gateway forwarding failed: {e}"),
254 code: "GATEWAY_ERROR".to_string(),
255 }),
256 )
257 })?;
258
259 if !response.status().is_success() {
260 let status = response.status();
261 let body = response
262 .text()
263 .await
264 .unwrap_or_else(|_| "Unknown error".to_string());
265 tracing::error!("Gateway returned error {}: {}", status, body);
266 return Err((
267 StatusCode::BAD_GATEWAY,
268 Json(ErrorResponse {
269 error: format!("Gateway returned error: {body}"),
270 code: "GATEWAY_ERROR".to_string(),
271 }),
272 ));
273 }
274
275 state.save_baselines().await;
277
278 Ok(Json(IngestResponse {
279 status: "ok".to_string(),
280 message: "Processed and forwarded to gateway".to_string(),
281 hsi_payload: serde_json::to_value(&hsi_snapshot).ok(),
282 }))
283}
284
285pub async fn run(
287 config: ServerConfig,
288) -> anyhow::Result<(SocketAddr, tokio::sync::oneshot::Sender<()>)> {
289 let state = Arc::new(ServerState::new(&config));
290
291 let app = Router::new()
292 .route("/health", get(health))
293 .route("/ingest", post(ingest))
294 .layer(
295 CorsLayer::new()
296 .allow_origin([
297 HeaderValue::from_static("http://localhost"),
298 HeaderValue::from_static("http://127.0.0.1"),
299 HeaderValue::from_static("chrome-extension://"),
301 ])
302 .allow_methods(Any)
303 .allow_headers(Any),
304 )
305 .with_state(state);
306
307 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
308 let listener = TcpListener::bind(addr).await?;
309 let actual_addr = listener.local_addr()?;
310
311 tracing::info!("Sensor agent server listening on http://{}", actual_addr);
312
313 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
314
315 tokio::spawn(async move {
316 if let Err(e) = axum::serve(listener, app)
317 .with_graceful_shutdown(async {
318 let _ = shutdown_rx.await;
319 tracing::info!("Server shutdown signal received");
320 })
321 .await
322 {
323 tracing::error!("Server error: {}", e);
324 }
325 });
326
327 Ok((actual_addr, shutdown_tx))
328}