Skip to main content

synheart_sensor_agent/
server.rs

1//! HTTP server for receiving behavioural data from the Chrome extension.
2//!
3//! This module provides an HTTP server that:
4//! - Accepts raw behavioural data from the Chrome extension via `POST /ingest`
5//! - Processes it through synheart-flux's `BehaviorProcessor`
6//! - Sends processed HSI to the local gateway via [`GatewayClient`](crate::gateway::GatewayClient)
7//!
8//! Use [`ServerConfig`] to configure the server and `run()` to start it.
9//!
10//! # Architecture
11//!
12//! ```text
13//! Chrome Extension ──→ POST /ingest ──→ sensor-agent ──→ gateway ──→ Syni Life
14//!                                           ↓
15//!                                    [Flux Processing]
16//! ```
17
18use crate::core::HsiSnapshot;
19use crate::gateway::GatewayConfig;
20use crate::gateway::{BehavioralSession as GatewayBehavioralSession, SessionMeta, SessionPayload};
21use axum::{
22    extract::State,
23    http::{HeaderValue, StatusCode},
24    routing::{get, post},
25    Json, Router,
26};
27use serde::{Deserialize, Serialize};
28use std::net::SocketAddr;
29use std::path::PathBuf;
30use std::sync::Arc;
31use synheart_flux::BehaviorProcessor;
32use tokio::net::TcpListener;
33use tokio::sync::RwLock;
34use tower_http::cors::{Any, CorsLayer};
35
36/// Server configuration
37#[derive(Debug, Clone)]
38pub struct ServerConfig {
39    /// Port to bind to (0 for random)
40    pub port: u16,
41    /// Gateway configuration for forwarding processed HSI
42    pub gateway_config: GatewayConfig,
43    /// State directory for baselines
44    pub state_dir: PathBuf,
45}
46
47impl ServerConfig {
48    /// Create a new server configuration
49    pub fn new(port: u16, gateway_config: GatewayConfig, state_dir: PathBuf) -> Self {
50        Self {
51            port,
52            gateway_config,
53            state_dir,
54        }
55    }
56}
57
58/// Shared server state
59pub struct ServerState {
60    /// Flux behavior processor
61    processor: RwLock<BehaviorProcessor>,
62    /// Gateway configuration
63    gateway_config: GatewayConfig,
64    /// HTTP client for gateway
65    http_client: reqwest::Client,
66    /// State directory
67    state_dir: PathBuf,
68}
69
70impl ServerState {
71    /// Create new server state
72    pub fn new(config: &ServerConfig) -> Self {
73        let mut processor = BehaviorProcessor::new();
74
75        // Load baselines if they exist
76        let baseline_path = config
77            .state_dir
78            .join("state")
79            .join("behavior_baselines.json");
80        if baseline_path.exists() {
81            if let Ok(json) = std::fs::read_to_string(&baseline_path) {
82                if let Err(e) = processor.load_baselines(&json) {
83                    tracing::warn!("Failed to load baselines: {}", e);
84                }
85            }
86        }
87
88        Self {
89            processor: RwLock::new(processor),
90            gateway_config: config.gateway_config.clone(),
91            http_client: reqwest::Client::builder()
92                .timeout(std::time::Duration::from_secs(10))
93                .build()
94                .expect("Failed to create HTTP client"),
95            state_dir: config.state_dir.clone(),
96        }
97    }
98
99    /// Save baselines to disk
100    async fn save_baselines(&self) {
101        let baseline_dir = self.state_dir.join("state");
102        let _ = std::fs::create_dir_all(&baseline_dir);
103
104        let processor = self.processor.read().await;
105        if let Ok(json) = processor.save_baselines() {
106            let path = baseline_dir.join("behavior_baselines.json");
107            if let Err(e) = std::fs::write(&path, json) {
108                tracing::warn!("Failed to save baselines: {}", e);
109            }
110        }
111    }
112}
113
114/// Behavioral session data from Chrome extension
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct BehavioralSession {
117    /// The behavioral session to process
118    pub session: serde_json::Value,
119}
120
121/// Response from the `POST /ingest` endpoint.
122#[derive(Debug, Clone, Serialize)]
123pub struct IngestResponse {
124    /// Processing status (`"ok"` or `"error"`).
125    pub status: String,
126    /// Human-readable description of the result.
127    pub message: String,
128    /// The HSI 1.0 payload produced by Flux processing, if available.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub hsi_payload: Option<serde_json::Value>,
131}
132
133/// Response from the `GET /health` endpoint.
134#[derive(Serialize)]
135pub struct HealthResponse {
136    /// Service health status (always `"ok"` when reachable).
137    pub status: String,
138    /// Agent version string.
139    pub version: String,
140}
141
142/// Generic error response returned by the server.
143#[derive(Serialize)]
144pub struct ErrorResponse {
145    /// Human-readable error message.
146    pub error: String,
147    /// Machine-readable error code.
148    pub code: String,
149}
150
151/// GET /health
152async fn health() -> Json<HealthResponse> {
153    Json(HealthResponse {
154        status: "ok".to_string(),
155        version: env!("CARGO_PKG_VERSION").to_string(),
156    })
157}
158
159/// POST /ingest
160///
161/// Accepts raw behavioral data from Chrome extension, processes with flux,
162/// and forwards to gateway.
163async fn ingest(
164    State(state): State<Arc<ServerState>>,
165    Json(data): Json<BehavioralSession>,
166) -> Result<Json<IngestResponse>, (StatusCode, Json<ErrorResponse>)> {
167    // Serialize session for flux processing
168    let session_json = serde_json::to_string(&data.session).map_err(|e| {
169        (
170            StatusCode::BAD_REQUEST,
171            Json(ErrorResponse {
172                error: format!("Invalid session data: {e}"),
173                code: "INVALID_SESSION".to_string(),
174            }),
175        )
176    })?;
177
178    // Process through flux
179    let hsi_json = {
180        let mut processor = state.processor.write().await;
181        processor.process(&session_json).map_err(|e| {
182            (
183                StatusCode::INTERNAL_SERVER_ERROR,
184                Json(ErrorResponse {
185                    error: format!("Flux processing failed: {e}"),
186                    code: "FLUX_ERROR".to_string(),
187                }),
188            )
189        })?
190    };
191
192    // Parse HSI payload (we forward as a snapshot to core-gateway)
193    let hsi_snapshot: HsiSnapshot = serde_json::from_str(&hsi_json).map_err(|e| {
194        (
195            StatusCode::INTERNAL_SERVER_ERROR,
196            Json(ErrorResponse {
197                error: format!("Failed to parse HSI output: {e}"),
198                code: "PARSE_ERROR".to_string(),
199            }),
200        )
201    })?;
202
203    // Extract session fields from the inbound payload for gateway session envelope.
204    // (If the Chrome extension omits fields, fall back to safe defaults.)
205    let session_obj = data.session.as_object();
206    let get_str = |key: &str| -> Option<String> {
207        session_obj
208            .and_then(|m| m.get(key))
209            .and_then(|v| v.as_str())
210            .map(|s| s.to_string())
211    };
212
213    let session_id = get_str("session_id").unwrap_or_else(|| "unknown-session".to_string());
214    let device_id = get_str("device_id").unwrap_or_else(|| "unknown-device".to_string());
215    let timezone = get_str("timezone").unwrap_or_else(|| "UTC".to_string());
216    let start_time = get_str("start_time").unwrap_or_else(|| hsi_snapshot.observed_at_utc.clone());
217    let end_time = get_str("end_time").unwrap_or_else(|| hsi_snapshot.computed_at_utc.clone());
218
219    // Forward to core-gateway behavioral ingest endpoint.
220    let gateway_url = state.gateway_config.ingest_url();
221    let gateway_payload = GatewayBehavioralSession {
222        session: SessionPayload {
223            session_id,
224            device_id,
225            timezone,
226            start_time,
227            end_time,
228            snapshots: vec![hsi_snapshot.clone()],
229            meta: SessionMeta {
230                source: "synheart-sensor-agent-server".to_string(),
231                version: env!("CARGO_PKG_VERSION").to_string(),
232                snapshot_count: 1,
233            },
234        },
235    };
236
237    let response = state
238        .http_client
239        .post(&gateway_url)
240        .header(
241            "Authorization",
242            format!("Bearer {}", state.gateway_config.token),
243        )
244        .header("Content-Type", "application/json")
245        .json(&gateway_payload)
246        .send()
247        .await
248        .map_err(|e| {
249            tracing::error!("Failed to forward to gateway: {}", e);
250            (
251                StatusCode::BAD_GATEWAY,
252                Json(ErrorResponse {
253                    error: format!("Gateway forwarding failed: {e}"),
254                    code: "GATEWAY_ERROR".to_string(),
255                }),
256            )
257        })?;
258
259    if !response.status().is_success() {
260        let status = response.status();
261        let body = response
262            .text()
263            .await
264            .unwrap_or_else(|_| "Unknown error".to_string());
265        tracing::error!("Gateway returned error {}: {}", status, body);
266        return Err((
267            StatusCode::BAD_GATEWAY,
268            Json(ErrorResponse {
269                error: format!("Gateway returned error: {body}"),
270                code: "GATEWAY_ERROR".to_string(),
271            }),
272        ));
273    }
274
275    // Save baselines periodically
276    state.save_baselines().await;
277
278    Ok(Json(IngestResponse {
279        status: "ok".to_string(),
280        message: "Processed and forwarded to gateway".to_string(),
281        hsi_payload: serde_json::to_value(&hsi_snapshot).ok(),
282    }))
283}
284
285/// Run the HTTP server
286pub async fn run(
287    config: ServerConfig,
288) -> anyhow::Result<(SocketAddr, tokio::sync::oneshot::Sender<()>)> {
289    let state = Arc::new(ServerState::new(&config));
290
291    let app = Router::new()
292        .route("/health", get(health))
293        .route("/ingest", post(ingest))
294        .layer(
295            CorsLayer::new()
296                .allow_origin([
297                    HeaderValue::from_static("http://localhost"),
298                    HeaderValue::from_static("http://127.0.0.1"),
299                    // Allow chrome-extension origins
300                    HeaderValue::from_static("chrome-extension://"),
301                ])
302                .allow_methods(Any)
303                .allow_headers(Any),
304        )
305        .with_state(state);
306
307    let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
308    let listener = TcpListener::bind(addr).await?;
309    let actual_addr = listener.local_addr()?;
310
311    tracing::info!("Sensor agent server listening on http://{}", actual_addr);
312
313    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
314
315    tokio::spawn(async move {
316        if let Err(e) = axum::serve(listener, app)
317            .with_graceful_shutdown(async {
318                let _ = shutdown_rx.await;
319                tracing::info!("Server shutdown signal received");
320            })
321            .await
322        {
323            tracing::error!("Server error: {}", e);
324        }
325    });
326
327    Ok((actual_addr, shutdown_tx))
328}