Skip to main content

daemon8_ingest/
lib.rs

1// SPDX-License-Identifier: LicenseRef-FCL-1.0-ALv2
2// Copyright (c) 2026 Havy.tech, LLC
3
4mod error;
5pub mod normalize;
6pub mod udp;
7#[cfg(unix)]
8pub mod unix;
9
10pub use error::{IngestError, Result};
11
12use axum::{
13    Router,
14    extract::State,
15    http::StatusCode,
16    routing::{get, post},
17};
18use serde_json::{Value, json};
19use tokio::sync::mpsc::UnboundedSender;
20
21use daemon8_types::Observation;
22
23#[derive(Clone)]
24struct IngestState {
25    tx: UnboundedSender<Observation>,
26}
27
28pub fn ingest_router(tx: UnboundedSender<Observation>) -> Router {
29    let state = IngestState { tx };
30
31    Router::new()
32        .route("/ingest", post(handle_ingest))
33        .route("/ingest/batch", post(handle_batch))
34        .route("/health", get(handle_health))
35        .with_state(state)
36}
37
38async fn handle_health() -> &'static str {
39    "ok"
40}
41
42async fn handle_ingest(
43    State(state): State<IngestState>,
44    axum::Json(body): axum::Json<Value>,
45) -> (StatusCode, axum::Json<Value>) {
46    let obs = normalize::normalize(body);
47
48    tracing::debug!(
49        kind = ?obs.kind,
50        severity = ?obs.severity,
51        origin = ?obs.origin,
52        "ingested observation"
53    );
54
55    let _ = state.tx.send(obs);
56
57    (StatusCode::ACCEPTED, axum::Json(json!({"ok": true})))
58}
59
60async fn handle_batch(
61    State(state): State<IngestState>,
62    axum::Json(items): axum::Json<Vec<Value>>,
63) -> (StatusCode, axum::Json<Value>) {
64    let count = items.len();
65
66    for item in items {
67        let obs = normalize::normalize(item);
68        let _ = state.tx.send(obs);
69    }
70
71    tracing::debug!(count, "ingested batch");
72
73    (
74        StatusCode::ACCEPTED,
75        axum::Json(json!({"ok": true, "count": count})),
76    )
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use axum::body::Body;
83    use daemon8_types::{ObservationKind, Origin, Severity};
84    use http_body_util::BodyExt;
85    use tower::ServiceExt;
86
87    fn router() -> (Router, tokio::sync::mpsc::UnboundedReceiver<Observation>) {
88        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
89        (ingest_router(tx), rx)
90    }
91
92    #[tokio::test]
93    async fn health_returns_ok() {
94        let (app, _rx) = router();
95
96        let req = axum::http::Request::builder()
97            .uri("/health")
98            .body(Body::empty())
99            .unwrap();
100
101        let resp = app.oneshot(req).await.unwrap();
102
103        assert_eq!(resp.status(), StatusCode::OK);
104
105        let body = resp.into_body().collect().await.unwrap().to_bytes();
106        assert_eq!(&body[..], b"ok");
107    }
108
109    #[tokio::test]
110    async fn ingest_returns_202_and_sends_observation() {
111        let (app, mut rx) = router();
112
113        let payload = json!({
114            "kind": "exception",
115            "severity": "error",
116            "data": { "message": "boom", "trace": "at line 1" },
117            "app": "test-app",
118            "file": "/src/main.rs",
119            "line": 42,
120        });
121
122        let req = axum::http::Request::builder()
123            .method("POST")
124            .uri("/ingest")
125            .header("content-type", "application/json")
126            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
127            .unwrap();
128
129        let resp = app.oneshot(req).await.unwrap();
130        assert_eq!(resp.status(), StatusCode::ACCEPTED);
131
132        let obs = rx.try_recv().expect("should have received an observation");
133        assert!(matches!(obs.kind, ObservationKind::Exception { .. }));
134        assert!(matches!(obs.severity, Severity::Error));
135        assert!(matches!(obs.origin, Origin::Application { ref name } if name == "test-app"));
136        assert_eq!(obs.source_location.as_ref().unwrap().file, "/src/main.rs");
137        assert_eq!(obs.source_location.as_ref().unwrap().line, 42);
138    }
139
140    #[tokio::test]
141    async fn ingest_with_channel_becomes_custom() {
142        let (app, mut rx) = router();
143
144        let payload = json!({
145            "channel": "my-events",
146            "data": { "foo": "bar" },
147        });
148
149        let req = axum::http::Request::builder()
150            .method("POST")
151            .uri("/ingest")
152            .header("content-type", "application/json")
153            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
154            .unwrap();
155
156        let resp = app.oneshot(req).await.unwrap();
157
158        assert_eq!(resp.status(), StatusCode::ACCEPTED);
159
160        let obs = rx.try_recv().unwrap();
161        assert!(
162            matches!(obs.kind, ObservationKind::Custom { ref channel } if channel == "my-events")
163        );
164    }
165
166    #[tokio::test]
167    async fn ingest_bare_json_defaults_to_log() {
168        let (app, mut rx) = router();
169
170        let payload = json!({ "message": "hello world" });
171
172        let req = axum::http::Request::builder()
173            .method("POST")
174            .uri("/ingest")
175            .header("content-type", "application/json")
176            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
177            .unwrap();
178
179        let _ = app.oneshot(req).await.unwrap();
180
181        let obs = rx.try_recv().unwrap();
182        assert!(matches!(obs.kind, ObservationKind::Log));
183        assert!(matches!(obs.severity, Severity::Debug));
184        assert!(matches!(obs.origin, Origin::Application { ref name } if name == "unknown"));
185    }
186
187    #[test]
188    fn parse_severity_variants() {
189        assert!(matches!(
190            normalize::parse_severity(Some("trace")),
191            Severity::Trace
192        ));
193        assert!(matches!(
194            normalize::parse_severity(Some("INFO")),
195            Severity::Info
196        ));
197        assert!(matches!(
198            normalize::parse_severity(Some("warning")),
199            Severity::Warn
200        ));
201        assert!(matches!(
202            normalize::parse_severity(Some("garbage")),
203            Severity::Debug
204        ));
205        assert!(matches!(normalize::parse_severity(None), Severity::Debug));
206    }
207
208    #[tokio::test]
209    async fn batch_ingests_multiple() {
210        let (app, mut rx) = router();
211
212        let payload = json!([
213            {"kind": "log", "data": {"msg": "one"}, "app": "test"},
214            {"kind": "log", "data": {"msg": "two"}, "app": "test"},
215            {"kind": "exception", "data": {"message": "boom"}, "severity": "error"},
216        ]);
217
218        let req = axum::http::Request::builder()
219            .method("POST")
220            .uri("/ingest/batch")
221            .header("content-type", "application/json")
222            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
223            .unwrap();
224
225        let resp = app.oneshot(req).await.unwrap();
226
227        assert_eq!(resp.status(), StatusCode::ACCEPTED);
228
229        let body: Value =
230            serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
231
232        assert_eq!(body["count"], 3);
233
234        assert!(rx.try_recv().is_ok());
235        assert!(rx.try_recv().is_ok());
236        let third = rx.try_recv().unwrap();
237        assert!(matches!(third.kind, ObservationKind::Exception { .. }));
238    }
239}