Skip to main content

haystack_server/
app.rs

1//! Server builder and startup.
2
3use actix_web::body::MessageBody;
4use actix_web::dev::{ServiceRequest, ServiceResponse};
5use actix_web::middleware::from_fn;
6use actix_web::{App, HttpMessage, HttpServer, web};
7
8use haystack_core::auth::{AuthHeader, parse_auth_header};
9use haystack_core::graph::SharedGraph;
10use haystack_core::ontology::DefNamespace;
11
12use crate::actions::ActionRegistry;
13use crate::auth::AuthManager;
14use crate::federation::Federation;
15use crate::his_store::HisStore;
16use crate::ops;
17use crate::state::AppState;
18use crate::ws;
19use crate::ws::WatchManager;
20
21/// Builder for the Haystack HTTP server.
22pub struct HaystackServer {
23    graph: SharedGraph,
24    namespace: DefNamespace,
25    auth_manager: AuthManager,
26    actions: ActionRegistry,
27    federation: Federation,
28    port: u16,
29    host: String,
30}
31
32impl HaystackServer {
33    /// Create a new server with the given entity graph.
34    pub fn new(graph: SharedGraph) -> Self {
35        Self {
36            graph,
37            namespace: DefNamespace::new(),
38            auth_manager: AuthManager::empty(),
39            actions: ActionRegistry::new(),
40            federation: Federation::new(),
41            port: 8080,
42            host: "127.0.0.1".to_string(),
43        }
44    }
45
46    /// Set the ontology namespace for def/spec operations.
47    pub fn with_namespace(mut self, ns: DefNamespace) -> Self {
48        self.namespace = ns;
49        self
50    }
51
52    /// Set the authentication manager.
53    pub fn with_auth(mut self, auth: AuthManager) -> Self {
54        self.auth_manager = auth;
55        self
56    }
57
58    /// Set the port to listen on (default: 8080).
59    pub fn port(mut self, port: u16) -> Self {
60        self.port = port;
61        self
62    }
63
64    /// Set the host to bind to (default: "127.0.0.1").
65    pub fn host(mut self, host: &str) -> Self {
66        self.host = host.to_string();
67        self
68    }
69
70    /// Set the action registry for the `invokeAction` op.
71    pub fn with_actions(mut self, actions: ActionRegistry) -> Self {
72        self.actions = actions;
73        self
74    }
75
76    /// Set the federation manager for remote connector queries.
77    pub fn with_federation(mut self, federation: Federation) -> Self {
78        self.federation = federation;
79        self
80    }
81
82    /// Start the HTTP server. This blocks until the server is stopped.
83    pub async fn run(self) -> std::io::Result<()> {
84        let state = web::Data::new(AppState {
85            graph: self.graph,
86            namespace: parking_lot::RwLock::new(self.namespace),
87            auth: self.auth_manager,
88            watches: WatchManager::new(),
89            actions: self.actions,
90            his: HisStore::new(),
91            started_at: std::time::Instant::now(),
92            federation: self.federation,
93        });
94
95        // Start federation background sync tasks
96        let _sync_handles = state.federation.start_background_sync();
97
98        log::info!("Starting haystack-server on {}:{}", self.host, self.port);
99
100        HttpServer::new(move || {
101            App::new()
102                .app_data(state.clone())
103                .app_data(actix_web::web::PayloadConfig::default().limit(2 * 1024 * 1024))
104                .wrap(from_fn(auth_middleware))
105                .configure(ops::configure)
106                .route("/api/ws", web::get().to(ws::ws_handler))
107        })
108        .bind((self.host.as_str(), self.port))?
109        .run()
110        .await
111    }
112}
113
114/// Determine the required permission for a given request path.
115///
116/// Returns `None` if the path does not require permission checking
117/// (e.g. public endpoints handled before auth).
118fn required_permission(path: &str) -> Option<&'static str> {
119    // System / admin endpoints
120    if path.starts_with("/api/system/") {
121        return Some("admin");
122    }
123
124    // Federation sync (exact and per-connector)
125    if path.starts_with("/api/federation/sync") {
126        return Some("write");
127    }
128
129    // Write operations
130    match path {
131        "/api/pointWrite"
132        | "/api/hisWrite"
133        | "/api/invokeAction"
134        | "/api/loadLib"
135        | "/api/unloadLib"
136        | "/api/import" => return Some("write"),
137        _ => {}
138    }
139
140    // Everything else that reaches here is a read-level operation:
141    // /api/about, /api/read, /api/nav, /api/defs, /api/libs,
142    // /api/hisRead, /api/watchSub, /api/watchPoll, /api/watchUnsub,
143    // /api/close, /api/ops, /api/formats, etc.
144    Some("read")
145}
146
147/// Authentication middleware.
148///
149/// - GET /api/about: pass through (about handles auth itself for SCRAM)
150/// - GET /api/ops, GET /api/formats: pass through (public info)
151/// - All other endpoints: require BEARER token if auth is enabled,
152///   then check the user has the required permission for that route.
153async fn auth_middleware(
154    req: ServiceRequest,
155    next: actix_web::middleware::Next<impl MessageBody>,
156) -> Result<ServiceResponse<impl MessageBody>, actix_web::Error> {
157    let path = req.path().to_string();
158    let method = req.method().clone();
159
160    // Allow about endpoint through (it handles auth itself for SCRAM handshake)
161    if path == "/api/about" {
162        return next.call(req).await;
163    }
164
165    // Allow ops and formats through without auth (public endpoints)
166    if (path == "/api/ops" || path == "/api/formats") && method == actix_web::http::Method::GET {
167        return next.call(req).await;
168    }
169
170    // Check if auth is enabled
171    let auth_enabled = {
172        let state = req
173            .app_data::<web::Data<AppState>>()
174            .expect("AppState must be configured");
175        state.auth.is_enabled()
176    };
177
178    if !auth_enabled {
179        // Auth is not enabled, pass through
180        return next.call(req).await;
181    }
182
183    // Extract and validate BEARER token
184    let auth_header = req
185        .headers()
186        .get("Authorization")
187        .and_then(|v| v.to_str().ok())
188        .map(|s| s.to_string());
189
190    match auth_header {
191        Some(header) => {
192            match parse_auth_header(&header) {
193                Ok(AuthHeader::Bearer { auth_token }) => {
194                    let user = {
195                        let state = req
196                            .app_data::<web::Data<AppState>>()
197                            .expect("AppState must be configured");
198                        state.auth.validate_token(&auth_token)
199                    };
200
201                    match user {
202                        Some(auth_user) => {
203                            // Check permission for the requested path
204                            if let Some(required) = required_permission(&path)
205                                && !AuthManager::check_permission(&auth_user, required)
206                            {
207                                return Err(crate::error::HaystackError::forbidden(format!(
208                                    "user '{}' lacks '{}' permission",
209                                    auth_user.username, required
210                                ))
211                                .into());
212                            }
213
214                            // Inject AuthUser into request extensions
215                            req.extensions_mut().insert(auth_user);
216                            next.call(req).await
217                        }
218                        None => Err(crate::error::HaystackError::new(
219                            "invalid or expired auth token",
220                            actix_web::http::StatusCode::UNAUTHORIZED,
221                        )
222                        .into()),
223                    }
224                }
225                _ => Err(crate::error::HaystackError::new(
226                    "BEARER token required",
227                    actix_web::http::StatusCode::UNAUTHORIZED,
228                )
229                .into()),
230            }
231        }
232        None => Err(crate::error::HaystackError::new(
233            "Authorization header required",
234            actix_web::http::StatusCode::UNAUTHORIZED,
235        )
236        .into()),
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn required_permission_read_ops() {
246        assert_eq!(required_permission("/api/read"), Some("read"));
247        assert_eq!(required_permission("/api/nav"), Some("read"));
248        assert_eq!(required_permission("/api/defs"), Some("read"));
249        assert_eq!(required_permission("/api/libs"), Some("read"));
250        assert_eq!(required_permission("/api/hisRead"), Some("read"));
251        assert_eq!(required_permission("/api/watchSub"), Some("read"));
252        assert_eq!(required_permission("/api/watchPoll"), Some("read"));
253        assert_eq!(required_permission("/api/watchUnsub"), Some("read"));
254        assert_eq!(required_permission("/api/close"), Some("read"));
255        assert_eq!(required_permission("/api/about"), Some("read"));
256        assert_eq!(required_permission("/api/ops"), Some("read"));
257        assert_eq!(required_permission("/api/formats"), Some("read"));
258    }
259
260    #[test]
261    fn required_permission_write_ops() {
262        assert_eq!(required_permission("/api/pointWrite"), Some("write"));
263        assert_eq!(required_permission("/api/hisWrite"), Some("write"));
264        assert_eq!(required_permission("/api/invokeAction"), Some("write"));
265        assert_eq!(required_permission("/api/import"), Some("write"));
266        assert_eq!(required_permission("/api/federation/sync"), Some("write"));
267    }
268
269    #[test]
270    fn required_permission_federation_sync_named() {
271        assert_eq!(
272            required_permission("/api/federation/sync/building-a"),
273            Some("write")
274        );
275    }
276
277    #[test]
278    fn required_permission_admin_ops() {
279        assert_eq!(required_permission("/api/system/backup"), Some("admin"));
280        assert_eq!(required_permission("/api/system/restart"), Some("admin"));
281        assert_eq!(required_permission("/api/system/"), Some("admin"));
282    }
283
284    // ---- Integration tests for the auth middleware ----
285
286    use crate::auth::users::hash_password;
287    use crate::ws::WatchManager;
288    use actix_web::dev::Service;
289    use actix_web::middleware::from_fn;
290    use actix_web::test as actix_test;
291    use actix_web::{App, HttpResponse};
292
293    /// Build an AppState with auth enabled: one admin user and one read-only viewer.
294    fn test_state() -> web::Data<AppState> {
295        let hash = hash_password("s3cret");
296        let toml_str = format!(
297            r#"
298[users.admin]
299password_hash = "{hash}"
300permissions = ["read", "write", "admin"]
301
302[users.viewer]
303password_hash = "{hash}"
304permissions = ["read"]
305"#
306        );
307        let auth = AuthManager::from_toml_str(&toml_str).unwrap();
308        web::Data::new(AppState {
309            graph: haystack_core::graph::SharedGraph::new(haystack_core::graph::EntityGraph::new()),
310            namespace: parking_lot::RwLock::new(DefNamespace::new()),
311            auth,
312            watches: WatchManager::new(),
313            actions: ActionRegistry::new(),
314            his: HisStore::new(),
315            started_at: std::time::Instant::now(),
316            federation: Federation::new(),
317        })
318    }
319
320    /// Insert a token directly into the AuthManager and return it.
321    fn insert_token(
322        state: &web::Data<AppState>,
323        username: &str,
324        permissions: Vec<String>,
325    ) -> String {
326        let token = uuid::Uuid::new_v4().to_string();
327        let user = crate::auth::AuthUser {
328            username: username.to_string(),
329            permissions,
330        };
331        state.auth.inject_token(token.clone(), user);
332        token
333    }
334
335    /// Minimal read handler.
336    async fn dummy_handler() -> HttpResponse {
337        HttpResponse::Ok().body("ok")
338    }
339
340    /// Create a test app with auth middleware and dummy routes.
341    fn test_app(
342        state: web::Data<AppState>,
343    ) -> App<
344        impl actix_web::dev::ServiceFactory<
345            ServiceRequest,
346            Config = (),
347            Response = ServiceResponse<impl MessageBody>,
348            Error = actix_web::Error,
349            InitError = (),
350        >,
351    > {
352        App::new()
353            .app_data(state)
354            .wrap(from_fn(auth_middleware))
355            .route("/api/about", web::get().to(dummy_handler))
356            .route("/api/ops", web::get().to(dummy_handler))
357            .route("/api/formats", web::get().to(dummy_handler))
358            .route("/api/read", web::post().to(dummy_handler))
359            .route("/api/hisRead", web::post().to(dummy_handler))
360            .route("/api/pointWrite", web::post().to(dummy_handler))
361            .route("/api/hisWrite", web::post().to(dummy_handler))
362            .route("/api/invokeAction", web::post().to(dummy_handler))
363            .route("/api/close", web::post().to(dummy_handler))
364            .route("/api/system/backup", web::post().to(dummy_handler))
365    }
366
367    /// Helper: call the service and extract the status code, handling both
368    /// success responses and middleware errors (which implement ResponseError).
369    async fn call_status(
370        app: &impl Service<
371            actix_http::Request,
372            Response = ServiceResponse<impl MessageBody>,
373            Error = actix_web::Error,
374        >,
375        req: actix_http::Request,
376    ) -> u16 {
377        match app.call(req).await {
378            Ok(resp) => resp.status().as_u16(),
379            Err(err) => err.as_response_error().status_code().as_u16(),
380        }
381    }
382
383    #[actix_rt::test]
384    async fn about_passes_through_without_auth() {
385        let state = test_state();
386        let app = actix_test::init_service(test_app(state)).await;
387
388        let req = actix_test::TestRequest::get()
389            .uri("/api/about")
390            .to_request();
391        assert_eq!(call_status(&app, req).await, 200);
392    }
393
394    #[actix_rt::test]
395    async fn ops_passes_through_without_auth() {
396        let state = test_state();
397        let app = actix_test::init_service(test_app(state)).await;
398
399        let req = actix_test::TestRequest::get().uri("/api/ops").to_request();
400        assert_eq!(call_status(&app, req).await, 200);
401    }
402
403    #[actix_rt::test]
404    async fn formats_passes_through_without_auth() {
405        let state = test_state();
406        let app = actix_test::init_service(test_app(state)).await;
407
408        let req = actix_test::TestRequest::get()
409            .uri("/api/formats")
410            .to_request();
411        assert_eq!(call_status(&app, req).await, 200);
412    }
413
414    #[actix_rt::test]
415    async fn protected_endpoint_without_token_returns_401() {
416        let state = test_state();
417        let app = actix_test::init_service(test_app(state)).await;
418
419        let req = actix_test::TestRequest::post()
420            .uri("/api/read")
421            .to_request();
422        assert_eq!(call_status(&app, req).await, 401);
423    }
424
425    #[actix_rt::test]
426    async fn viewer_can_read() {
427        let state = test_state();
428        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
429        let app = actix_test::init_service(test_app(state)).await;
430
431        let req = actix_test::TestRequest::post()
432            .uri("/api/read")
433            .insert_header(("Authorization", format!("BEARER authToken={token}")))
434            .to_request();
435        assert_eq!(call_status(&app, req).await, 200);
436    }
437
438    #[actix_rt::test]
439    async fn viewer_cannot_write() {
440        let state = test_state();
441        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
442        let app = actix_test::init_service(test_app(state)).await;
443
444        let req = actix_test::TestRequest::post()
445            .uri("/api/pointWrite")
446            .insert_header(("Authorization", format!("BEARER authToken={token}")))
447            .to_request();
448        assert_eq!(call_status(&app, req).await, 403);
449    }
450
451    #[actix_rt::test]
452    async fn viewer_cannot_invoke_action() {
453        let state = test_state();
454        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
455        let app = actix_test::init_service(test_app(state)).await;
456
457        let req = actix_test::TestRequest::post()
458            .uri("/api/invokeAction")
459            .insert_header(("Authorization", format!("BEARER authToken={token}")))
460            .to_request();
461        assert_eq!(call_status(&app, req).await, 403);
462    }
463
464    #[actix_rt::test]
465    async fn viewer_cannot_his_write() {
466        let state = test_state();
467        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
468        let app = actix_test::init_service(test_app(state)).await;
469
470        let req = actix_test::TestRequest::post()
471            .uri("/api/hisWrite")
472            .insert_header(("Authorization", format!("BEARER authToken={token}")))
473            .to_request();
474        assert_eq!(call_status(&app, req).await, 403);
475    }
476
477    #[actix_rt::test]
478    async fn writer_can_write() {
479        let state = test_state();
480        let token = insert_token(
481            &state,
482            "writer",
483            vec!["read".to_string(), "write".to_string()],
484        );
485        let app = actix_test::init_service(test_app(state)).await;
486
487        let req = actix_test::TestRequest::post()
488            .uri("/api/pointWrite")
489            .insert_header(("Authorization", format!("BEARER authToken={token}")))
490            .to_request();
491        assert_eq!(call_status(&app, req).await, 200);
492    }
493
494    #[actix_rt::test]
495    async fn admin_can_access_system() {
496        let state = test_state();
497        let token = insert_token(&state, "admin", vec!["admin".to_string()]);
498        let app = actix_test::init_service(test_app(state)).await;
499
500        let req = actix_test::TestRequest::post()
501            .uri("/api/system/backup")
502            .insert_header(("Authorization", format!("BEARER authToken={token}")))
503            .to_request();
504        assert_eq!(call_status(&app, req).await, 200);
505    }
506
507    #[actix_rt::test]
508    async fn viewer_cannot_access_system() {
509        let state = test_state();
510        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
511        let app = actix_test::init_service(test_app(state)).await;
512
513        let req = actix_test::TestRequest::post()
514            .uri("/api/system/backup")
515            .insert_header(("Authorization", format!("BEARER authToken={token}")))
516            .to_request();
517        assert_eq!(call_status(&app, req).await, 403);
518    }
519
520    #[actix_rt::test]
521    async fn writer_cannot_access_system() {
522        let state = test_state();
523        let token = insert_token(
524            &state,
525            "writer",
526            vec!["read".to_string(), "write".to_string()],
527        );
528        let app = actix_test::init_service(test_app(state)).await;
529
530        let req = actix_test::TestRequest::post()
531            .uri("/api/system/backup")
532            .insert_header(("Authorization", format!("BEARER authToken={token}")))
533            .to_request();
534        assert_eq!(call_status(&app, req).await, 403);
535    }
536
537    #[actix_rt::test]
538    async fn viewer_can_close() {
539        let state = test_state();
540        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
541        let app = actix_test::init_service(test_app(state)).await;
542
543        let req = actix_test::TestRequest::post()
544            .uri("/api/close")
545            .insert_header(("Authorization", format!("BEARER authToken={token}")))
546            .to_request();
547        assert_eq!(call_status(&app, req).await, 200);
548    }
549
550    #[actix_rt::test]
551    async fn viewer_can_his_read() {
552        let state = test_state();
553        let token = insert_token(&state, "viewer", vec!["read".to_string()]);
554        let app = actix_test::init_service(test_app(state)).await;
555
556        let req = actix_test::TestRequest::post()
557            .uri("/api/hisRead")
558            .insert_header(("Authorization", format!("BEARER authToken={token}")))
559            .to_request();
560        assert_eq!(call_status(&app, req).await, 200);
561    }
562
563    #[actix_rt::test]
564    async fn invalid_token_returns_401() {
565        let state = test_state();
566        let app = actix_test::init_service(test_app(state)).await;
567
568        let req = actix_test::TestRequest::post()
569            .uri("/api/read")
570            .insert_header(("Authorization", "BEARER authToken=bogus-token"))
571            .to_request();
572        assert_eq!(call_status(&app, req).await, 401);
573    }
574}