Skip to main content

haystack_server/
app.rs

1//! Server builder and startup.
2
3use actix_web::body::MessageBody;
4use actix_web::dev::{ServiceRequest, ServiceResponse};
5use actix_web::middleware::from_fn;
6use actix_web::{App, HttpMessage, HttpServer, web};
7
8use haystack_core::auth::{AuthHeader, parse_auth_header};
9use haystack_core::graph::SharedGraph;
10use haystack_core::ontology::DefNamespace;
11
12use crate::actions::ActionRegistry;
13use crate::auth::AuthManager;
14use crate::federation::Federation;
15use crate::his_store::HisStore;
16use crate::ops;
17use crate::state::AppState;
18use crate::ws;
19use crate::ws::WatchManager;
20
21/// Builder for the Haystack HTTP server.
22pub struct HaystackServer {
23    graph: SharedGraph,
24    namespace: DefNamespace,
25    auth_manager: AuthManager,
26    actions: ActionRegistry,
27    federation: Federation,
28    port: u16,
29    host: String,
30}
31
32impl HaystackServer {
33    /// Create a new server with the given entity graph.
34    pub fn new(graph: SharedGraph) -> Self {
35        Self {
36            graph,
37            namespace: DefNamespace::new(),
38            auth_manager: AuthManager::empty(),
39            actions: ActionRegistry::new(),
40            federation: Federation::new(),
41            port: 8080,
42            host: "127.0.0.1".to_string(),
43        }
44    }
45
46    /// Set the ontology namespace for def/spec operations.
47    pub fn with_namespace(mut self, ns: DefNamespace) -> Self {
48        self.namespace = ns;
49        self
50    }
51
52    /// Set the authentication manager.
53    pub fn with_auth(mut self, auth: AuthManager) -> Self {
54        self.auth_manager = auth;
55        self
56    }
57
58    /// Set the port to listen on (default: 8080).
59    pub fn port(mut self, port: u16) -> Self {
60        self.port = port;
61        self
62    }
63
64    /// Set the host to bind to (default: "127.0.0.1").
65    pub fn host(mut self, host: &str) -> Self {
66        self.host = host.to_string();
67        self
68    }
69
70    /// Set the action registry for the `invokeAction` op.
71    pub fn with_actions(mut self, actions: ActionRegistry) -> Self {
72        self.actions = actions;
73        self
74    }
75
76    /// Set the federation manager for remote connector queries.
77    pub fn with_federation(mut self, federation: Federation) -> Self {
78        self.federation = federation;
79        self
80    }
81
82    /// Start the HTTP server. This blocks until the server is stopped.
83    pub async fn run(self) -> std::io::Result<()> {
84        let state = web::Data::new(AppState {
85            graph: self.graph,
86            namespace: parking_lot::RwLock::new(self.namespace),
87            auth: self.auth_manager,
88            watches: WatchManager::new(),
89            actions: self.actions,
90            his: HisStore::new(),
91            started_at: std::time::Instant::now(),
92            federation: self.federation,
93        });
94
95        // Start federation background sync tasks
96        let _sync_handles = state.federation.start_background_sync();
97
98        log::info!("Starting haystack-server on {}:{}", self.host, self.port);
99
100        HttpServer::new(move || {
101            App::new()
102                .app_data(state.clone())
103                .app_data(actix_web::web::PayloadConfig::default().limit(2 * 1024 * 1024))
104                .wrap(from_fn(auth_middleware))
105                .configure(ops::configure)
106                .route("/api/ws", web::get().to(ws::ws_handler))
107        })
108        .bind((self.host.as_str(), self.port))?
109        .run()
110        .await
111    }
112}
113
114/// Determine the required permission for a given request path.
115///
116/// Returns `None` if the path does not require permission checking
117/// (e.g. public endpoints handled before auth).
118fn required_permission(path: &str) -> Option<&'static str> {
119    // System / admin endpoints
120    if path.starts_with("/api/system/") {
121        return Some("admin");
122    }
123
124    // Federation sync (exact and per-connector)
125    if path.starts_with("/api/federation/sync") {
126        return Some("write");
127    }
128
129    // Write operations
130    match path {
131        "/api/pointWrite" | "/api/hisWrite" | "/api/invokeAction" | "/api/loadLib"
132        | "/api/unloadLib" | "/api/import" => return Some("write"),
133        _ => {}
134    }
135
136    // Everything else that reaches here is a read-level operation:
137    // /api/about, /api/read, /api/nav, /api/defs, /api/libs,
138    // /api/hisRead, /api/watchSub, /api/watchPoll, /api/watchUnsub,
139    // /api/close, /api/ops, /api/formats, etc.
140    Some("read")
141}
142
143/// Authentication middleware.
144///
145/// - GET /api/about: pass through (about handles auth itself for SCRAM)
146/// - GET /api/ops, GET /api/formats: pass through (public info)
147/// - All other endpoints: require BEARER token if auth is enabled,
148///   then check the user has the required permission for that route.
149async fn auth_middleware(
150    req: ServiceRequest,
151    next: actix_web::middleware::Next<impl MessageBody>,
152) -> Result<ServiceResponse<impl MessageBody>, actix_web::Error> {
153    let path = req.path().to_string();
154    let method = req.method().clone();
155
156    // Allow about endpoint through (it handles auth itself for SCRAM handshake)
157    if path == "/api/about" {
158        return next.call(req).await;
159    }
160
161    // Allow ops and formats through without auth (public endpoints)
162    if (path == "/api/ops" || path == "/api/formats") && method == actix_web::http::Method::GET {
163        return next.call(req).await;
164    }
165
166    // Check if auth is enabled
167    let auth_enabled = {
168        let state = req
169            .app_data::<web::Data<AppState>>()
170            .expect("AppState must be configured");
171        state.auth.is_enabled()
172    };
173
174    if !auth_enabled {
175        // Auth is not enabled, pass through
176        return next.call(req).await;
177    }
178
179    // Extract and validate BEARER token
180    let auth_header = req
181        .headers()
182        .get("Authorization")
183        .and_then(|v| v.to_str().ok())
184        .map(|s| s.to_string());
185
186    match auth_header {
187        Some(header) => {
188            match parse_auth_header(&header) {
189                Ok(AuthHeader::Bearer { auth_token }) => {
190                    let user = {
191                        let state = req
192                            .app_data::<web::Data<AppState>>()
193                            .expect("AppState must be configured");
194                        state.auth.validate_token(&auth_token)
195                    };
196
197                    match user {
198                        Some(auth_user) => {
199                            // Check permission for the requested path
200                            if let Some(required) = required_permission(&path)
201                                && !AuthManager::check_permission(&auth_user, required)
202                            {
203                                return Err(crate::error::HaystackError::forbidden(format!(
204                                    "user '{}' lacks '{}' permission",
205                                    auth_user.username, required
206                                ))
207                                .into());
208                            }
209
210                            // Inject AuthUser into request extensions
211                            req.extensions_mut().insert(auth_user);
212                            next.call(req).await
213                        }
214                        None => Err(crate::error::HaystackError::new(
215                            "invalid or expired auth token",
216                            actix_web::http::StatusCode::UNAUTHORIZED,
217                        )
218                        .into()),
219                    }
220                }
221                _ => Err(crate::error::HaystackError::new(
222                    "BEARER token required",
223                    actix_web::http::StatusCode::UNAUTHORIZED,
224                )
225                .into()),
226            }
227        }
228        None => Err(crate::error::HaystackError::new(
229            "Authorization header required",
230            actix_web::http::StatusCode::UNAUTHORIZED,
231        )
232        .into()),
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn required_permission_read_ops() {
242        assert_eq!(required_permission("/api/read"), Some("read"));
243        assert_eq!(required_permission("/api/nav"), Some("read"));
244        assert_eq!(required_permission("/api/defs"), Some("read"));
245        assert_eq!(required_permission("/api/libs"), Some("read"));
246        assert_eq!(required_permission("/api/hisRead"), Some("read"));
247        assert_eq!(required_permission("/api/watchSub"), Some("read"));
248        assert_eq!(required_permission("/api/watchPoll"), Some("read"));
249        assert_eq!(required_permission("/api/watchUnsub"), Some("read"));
250        assert_eq!(required_permission("/api/close"), Some("read"));
251        assert_eq!(required_permission("/api/about"), Some("read"));
252        assert_eq!(required_permission("/api/ops"), Some("read"));
253        assert_eq!(required_permission("/api/formats"), Some("read"));
254    }
255
256    #[test]
257    fn required_permission_write_ops() {
258        assert_eq!(required_permission("/api/pointWrite"), Some("write"));
259        assert_eq!(required_permission("/api/hisWrite"), Some("write"));
260        assert_eq!(required_permission("/api/invokeAction"), Some("write"));
261        assert_eq!(required_permission("/api/import"), Some("write"));
262        assert_eq!(required_permission("/api/federation/sync"), Some("write"));
263    }
264
265    #[test]
266    fn required_permission_federation_sync_named() {
267        assert_eq!(
268            required_permission("/api/federation/sync/building-a"),
269            Some("write")
270        );
271    }
272
273    #[test]
274    fn required_permission_admin_ops() {
275        assert_eq!(required_permission("/api/system/backup"), Some("admin"));
276        assert_eq!(required_permission("/api/system/restart"), Some("admin"));
277        assert_eq!(required_permission("/api/system/"), Some("admin"));
278    }
279
280    // ---- Integration tests for the auth middleware ----
281
282    use crate::auth::users::hash_password;
283    use crate::ws::WatchManager;
284    use actix_web::dev::Service;
285    use actix_web::middleware::from_fn;
286    use actix_web::test as actix_test;
287    use actix_web::{App, HttpResponse};
288
289    /// Build an AppState with auth enabled: one admin user and one read-only viewer.
290    fn test_state() -> web::Data<AppState> {
291        let hash = hash_password("s3cret");
292        let toml_str = format!(
293            r#"
294[users.admin]
295password_hash = "{hash}"
296permissions = ["read", "write", "admin"]
297
298[users.viewer]
299password_hash = "{hash}"
300permissions = ["read"]
301"#
302        );
303        let auth = AuthManager::from_toml_str(&toml_str).unwrap();
304        web::Data::new(AppState {
305            graph: haystack_core::graph::SharedGraph::new(haystack_core::graph::EntityGraph::new()),
306            namespace: parking_lot::RwLock::new(DefNamespace::new()),
307            auth,
308            watches: WatchManager::new(),
309            actions: ActionRegistry::new(),
310            his: HisStore::new(),
311            started_at: std::time::Instant::now(),
312            federation: Federation::new(),
313        })
314    }
315
316    /// Insert a token directly into the AuthManager and return it.
317    fn insert_token(
318        state: &web::Data<AppState>,
319        username: &str,
320        permissions: Vec<String>,
321    ) -> String {
322        let token = uuid::Uuid::new_v4().to_string();
323        let user = crate::auth::AuthUser {
324            username: username.to_string(),
325            permissions,
326        };
327        state.auth.inject_token(token.clone(), user);
328        token
329    }
330
331    /// Minimal read handler.
332    async fn dummy_handler() -> HttpResponse {
333        HttpResponse::Ok().body("ok")
334    }
335
336    /// Create a test app with auth middleware and dummy routes.
337    fn test_app(
338        state: web::Data<AppState>,
339    ) -> App<
340        impl actix_web::dev::ServiceFactory<
341            ServiceRequest,
342            Config = (),
343            Response = ServiceResponse<impl MessageBody>,
344            Error = actix_web::Error,
345            InitError = (),
346        >,
347    > {
348        App::new()
349            .app_data(state)
350            .wrap(from_fn(auth_middleware))
351            .route("/api/about", web::get().to(dummy_handler))
352            .route("/api/ops", web::get().to(dummy_handler))
353            .route("/api/formats", web::get().to(dummy_handler))
354            .route("/api/read", web::post().to(dummy_handler))
355            .route("/api/hisRead", web::post().to(dummy_handler))
356            .route("/api/pointWrite", web::post().to(dummy_handler))
357            .route("/api/hisWrite", web::post().to(dummy_handler))
358            .route("/api/invokeAction", web::post().to(dummy_handler))
359            .route("/api/close", web::post().to(dummy_handler))
360            .route("/api/system/backup", web::post().to(dummy_handler))
361    }
362
363    /// Helper: call the service and extract the status code, handling both
364    /// success responses and middleware errors (which implement ResponseError).
365    async fn call_status(
366        app: &impl Service<
367            actix_http::Request,
368            Response = ServiceResponse<impl MessageBody>,
369            Error = actix_web::Error,
370        >,
371        req: actix_http::Request,
372    ) -> u16 {
373        match app.call(req).await {
374            Ok(resp) => resp.status().as_u16(),
375            Err(err) => err.as_response_error().status_code().as_u16(),
376        }
377    }
378
379    #[actix_rt::test]
380    async fn about_passes_through_without_auth() {
381        let state = test_state();
382        let app = actix_test::init_service(test_app(state)).await;
383
384        let req = actix_test::TestRequest::get()
385            .uri("/api/about")
386            .to_request();
387        assert_eq!(call_status(&app, req).await, 200);
388    }
389
390    #[actix_rt::test]
391    async fn ops_passes_through_without_auth() {
392        let state = test_state();
393        let app = actix_test::init_service(test_app(state)).await;
394
395        let req = actix_test::TestRequest::get().uri("/api/ops").to_request();
396        assert_eq!(call_status(&app, req).await, 200);
397    }
398
399    #[actix_rt::test]
400    async fn formats_passes_through_without_auth() {
401        let state = test_state();
402        let app = actix_test::init_service(test_app(state)).await;
403
404        let req = actix_test::TestRequest::get()
405            .uri("/api/formats")
406            .to_request();
407        assert_eq!(call_status(&app, req).await, 200);
408    }
409
410    #[actix_rt::test]
411    async fn protected_endpoint_without_token_returns_401() {
412        let state = test_state();
413        let app = actix_test::init_service(test_app(state)).await;
414
415        let req = actix_test::TestRequest::post()
416            .uri("/api/read")
417            .to_request();
418        assert_eq!(call_status(&app, req).await, 401);
419    }
420
421    #[actix_rt::test]
422    async fn viewer_can_read() {
423        let state = test_state();
424        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
425        let app = actix_test::init_service(test_app(state)).await;
426
427        let req = actix_test::TestRequest::post()
428            .uri("/api/read")
429            .insert_header(("Authorization", format!("BEARER authToken={token}")))
430            .to_request();
431        assert_eq!(call_status(&app, req).await, 200);
432    }
433
434    #[actix_rt::test]
435    async fn viewer_cannot_write() {
436        let state = test_state();
437        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
438        let app = actix_test::init_service(test_app(state)).await;
439
440        let req = actix_test::TestRequest::post()
441            .uri("/api/pointWrite")
442            .insert_header(("Authorization", format!("BEARER authToken={token}")))
443            .to_request();
444        assert_eq!(call_status(&app, req).await, 403);
445    }
446
447    #[actix_rt::test]
448    async fn viewer_cannot_invoke_action() {
449        let state = test_state();
450        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
451        let app = actix_test::init_service(test_app(state)).await;
452
453        let req = actix_test::TestRequest::post()
454            .uri("/api/invokeAction")
455            .insert_header(("Authorization", format!("BEARER authToken={token}")))
456            .to_request();
457        assert_eq!(call_status(&app, req).await, 403);
458    }
459
460    #[actix_rt::test]
461    async fn viewer_cannot_his_write() {
462        let state = test_state();
463        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
464        let app = actix_test::init_service(test_app(state)).await;
465
466        let req = actix_test::TestRequest::post()
467            .uri("/api/hisWrite")
468            .insert_header(("Authorization", format!("BEARER authToken={token}")))
469            .to_request();
470        assert_eq!(call_status(&app, req).await, 403);
471    }
472
473    #[actix_rt::test]
474    async fn writer_can_write() {
475        let state = test_state();
476        let token = insert_token(
477            &state,
478            "writer",
479            vec!["read".to_string(), "write".to_string()],
480        );
481        let app = actix_test::init_service(test_app(state)).await;
482
483        let req = actix_test::TestRequest::post()
484            .uri("/api/pointWrite")
485            .insert_header(("Authorization", format!("BEARER authToken={token}")))
486            .to_request();
487        assert_eq!(call_status(&app, req).await, 200);
488    }
489
490    #[actix_rt::test]
491    async fn admin_can_access_system() {
492        let state = test_state();
493        let token = insert_token(&state, "admin", vec!["admin".to_string()]);
494        let app = actix_test::init_service(test_app(state)).await;
495
496        let req = actix_test::TestRequest::post()
497            .uri("/api/system/backup")
498            .insert_header(("Authorization", format!("BEARER authToken={token}")))
499            .to_request();
500        assert_eq!(call_status(&app, req).await, 200);
501    }
502
503    #[actix_rt::test]
504    async fn viewer_cannot_access_system() {
505        let state = test_state();
506        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
507        let app = actix_test::init_service(test_app(state)).await;
508
509        let req = actix_test::TestRequest::post()
510            .uri("/api/system/backup")
511            .insert_header(("Authorization", format!("BEARER authToken={token}")))
512            .to_request();
513        assert_eq!(call_status(&app, req).await, 403);
514    }
515
516    #[actix_rt::test]
517    async fn writer_cannot_access_system() {
518        let state = test_state();
519        let token = insert_token(
520            &state,
521            "writer",
522            vec!["read".to_string(), "write".to_string()],
523        );
524        let app = actix_test::init_service(test_app(state)).await;
525
526        let req = actix_test::TestRequest::post()
527            .uri("/api/system/backup")
528            .insert_header(("Authorization", format!("BEARER authToken={token}")))
529            .to_request();
530        assert_eq!(call_status(&app, req).await, 403);
531    }
532
533    #[actix_rt::test]
534    async fn viewer_can_close() {
535        let state = test_state();
536        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
537        let app = actix_test::init_service(test_app(state)).await;
538
539        let req = actix_test::TestRequest::post()
540            .uri("/api/close")
541            .insert_header(("Authorization", format!("BEARER authToken={token}")))
542            .to_request();
543        assert_eq!(call_status(&app, req).await, 200);
544    }
545
546    #[actix_rt::test]
547    async fn viewer_can_his_read() {
548        let state = test_state();
549        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
550        let app = actix_test::init_service(test_app(state)).await;
551
552        let req = actix_test::TestRequest::post()
553            .uri("/api/hisRead")
554            .insert_header(("Authorization", format!("BEARER authToken={token}")))
555            .to_request();
556        assert_eq!(call_status(&app, req).await, 200);
557    }
558
559    #[actix_rt::test]
560    async fn invalid_token_returns_401() {
561        let state = test_state();
562        let app = actix_test::init_service(test_app(state)).await;
563
564        let req = actix_test::TestRequest::post()
565            .uri("/api/read")
566            .insert_header(("Authorization", "BEARER authToken=bogus-token"))
567            .to_request();
568        assert_eq!(call_status(&app, req).await, 401);
569    }
570}