Skip to main content

daemon8_ingest/
lib.rs

1// SPDX-License-Identifier: LicenseRef-FCL-1.0-ALv2
2// Copyright (c) 2026 Havy.tech, LLC
3
4mod error;
5pub mod normalize;
6pub mod udp;
7#[cfg(unix)]
8pub mod unix;
9
10pub use error::{IngestError, Result};
11
12use axum::{
13    Router,
14    extract::State,
15    http::StatusCode,
16    routing::{get, post},
17};
18use serde_json::{Value, json};
19use tokio::sync::mpsc::UnboundedSender;
20
21use daemon8_types::Observation;
22
23#[derive(Clone)]
24struct IngestState {
25    tx: UnboundedSender<Observation>,
26}
27
28pub fn ingest_router(tx: UnboundedSender<Observation>) -> Router {
29    let state = IngestState { tx };
30
31    Router::new()
32        .route("/ingest", post(handle_ingest))
33        .route("/ingest/batch", post(handle_batch))
34        .route("/health", get(handle_health))
35        .with_state(state)
36}
37
38async fn handle_health() -> &'static str {
39    "ok"
40}
41
42async fn handle_ingest(
43    State(state): State<IngestState>,
44    axum::Json(body): axum::Json<Value>,
45) -> (StatusCode, axum::Json<Value>) {
46    let obs = normalize::normalize(body);
47
48    tracing::debug!(
49        kind = ?obs.kind,
50        severity = ?obs.severity,
51        origin = ?obs.origin,
52        "ingested observation"
53    );
54
55    if state.tx.send(obs).is_err() {
56        return (
57            StatusCode::SERVICE_UNAVAILABLE,
58            axum::Json(json!({"ok": false, "error": "ingest channel closed"})),
59        );
60    }
61
62    (StatusCode::ACCEPTED, axum::Json(json!({"ok": true})))
63}
64
65async fn handle_batch(
66    State(state): State<IngestState>,
67    axum::Json(items): axum::Json<Vec<Value>>,
68) -> (StatusCode, axum::Json<Value>) {
69    let count = items.len();
70    let mut dropped = 0usize;
71
72    for item in items {
73        let obs = normalize::normalize(item);
74        if state.tx.send(obs).is_err() {
75            dropped += 1;
76        }
77    }
78
79    tracing::debug!(count, dropped, "ingested batch");
80
81    if dropped > 0 {
82        return (
83            StatusCode::SERVICE_UNAVAILABLE,
84            axum::Json(json!({
85                "ok": false,
86                "error": "ingest channel closed",
87                "count": count,
88                "dropped": dropped,
89            })),
90        );
91    }
92
93    (
94        StatusCode::ACCEPTED,
95        axum::Json(json!({"ok": true, "count": count})),
96    )
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use axum::body::Body;
103    use daemon8_types::{ObservationKind, Origin, Severity};
104    use http_body_util::BodyExt;
105    use tower::ServiceExt;
106
107    fn router() -> (Router, tokio::sync::mpsc::UnboundedReceiver<Observation>) {
108        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
109        (ingest_router(tx), rx)
110    }
111
112    #[tokio::test]
113    async fn health_returns_ok() {
114        let (app, _rx) = router();
115
116        let req = axum::http::Request::builder()
117            .uri("/health")
118            .body(Body::empty())
119            .unwrap();
120
121        let resp = app.oneshot(req).await.unwrap();
122
123        assert_eq!(resp.status(), StatusCode::OK);
124
125        let body = resp.into_body().collect().await.unwrap().to_bytes();
126        assert_eq!(&body[..], b"ok");
127    }
128
129    #[tokio::test]
130    async fn ingest_returns_202_and_sends_observation() {
131        let (app, mut rx) = router();
132
133        let payload = json!({
134            "kind": "exception",
135            "severity": "error",
136            "data": { "message": "boom", "trace": "at line 1" },
137            "app": "test-app",
138            "file": "/src/main.rs",
139            "line": 42,
140        });
141
142        let req = axum::http::Request::builder()
143            .method("POST")
144            .uri("/ingest")
145            .header("content-type", "application/json")
146            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
147            .unwrap();
148
149        let resp = app.oneshot(req).await.unwrap();
150        assert_eq!(resp.status(), StatusCode::ACCEPTED);
151
152        let obs = rx.try_recv().expect("should have received an observation");
153        assert!(matches!(obs.kind, ObservationKind::Exception { .. }));
154        assert!(matches!(obs.severity, Severity::Error));
155        assert!(matches!(obs.origin, Origin::Application { ref name } if name == "test-app"));
156        assert_eq!(obs.source_location.as_ref().unwrap().file, "/src/main.rs");
157        assert_eq!(obs.source_location.as_ref().unwrap().line, 42);
158    }
159
160    #[tokio::test]
161    async fn ingest_with_channel_becomes_custom() {
162        let (app, mut rx) = router();
163
164        let payload = json!({
165            "channel": "my-events",
166            "data": { "foo": "bar" },
167        });
168
169        let req = axum::http::Request::builder()
170            .method("POST")
171            .uri("/ingest")
172            .header("content-type", "application/json")
173            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
174            .unwrap();
175
176        let resp = app.oneshot(req).await.unwrap();
177
178        assert_eq!(resp.status(), StatusCode::ACCEPTED);
179
180        let obs = rx.try_recv().unwrap();
181        assert!(
182            matches!(obs.kind, ObservationKind::Custom { ref channel } if channel == "my-events")
183        );
184    }
185
186    #[tokio::test]
187    async fn ingest_bare_json_defaults_to_log() {
188        let (app, mut rx) = router();
189
190        let payload = json!({ "message": "hello world" });
191
192        let req = axum::http::Request::builder()
193            .method("POST")
194            .uri("/ingest")
195            .header("content-type", "application/json")
196            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
197            .unwrap();
198
199        let _ = app.oneshot(req).await.unwrap();
200
201        let obs = rx.try_recv().unwrap();
202        assert!(matches!(obs.kind, ObservationKind::Log));
203        assert!(matches!(obs.severity, Severity::Debug));
204        assert!(matches!(obs.origin, Origin::Application { ref name } if name == "unknown"));
205    }
206
207    #[test]
208    fn parse_severity_variants() {
209        assert!(matches!(
210            normalize::parse_severity(Some("trace")),
211            Severity::Trace
212        ));
213        assert!(matches!(
214            normalize::parse_severity(Some("INFO")),
215            Severity::Info
216        ));
217        assert!(matches!(
218            normalize::parse_severity(Some("warning")),
219            Severity::Warn
220        ));
221        assert!(matches!(
222            normalize::parse_severity(Some("garbage")),
223            Severity::Debug
224        ));
225        assert!(matches!(normalize::parse_severity(None), Severity::Debug));
226    }
227
228    #[tokio::test]
229    async fn ingest_returns_503_when_channel_closed() {
230        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Observation>();
231        let app = ingest_router(tx);
232        drop(rx);
233
234        let payload = json!({
235            "kind": "log",
236            "data": { "msg": "shutdown probe" },
237            "app": "test-app",
238        });
239
240        let req = axum::http::Request::builder()
241            .method("POST")
242            .uri("/ingest")
243            .header("content-type", "application/json")
244            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
245            .unwrap();
246
247        let resp = app.oneshot(req).await.unwrap();
248        assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
249
250        let body: Value =
251            serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
252        assert_eq!(body["ok"], false);
253        assert_eq!(body["error"], "ingest channel closed");
254    }
255
256    #[tokio::test]
257    async fn batch_returns_503_when_channel_closed() {
258        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Observation>();
259        let app = ingest_router(tx);
260        drop(rx);
261
262        let payload = json!([
263            {"kind": "log", "data": {"msg": "one"}, "app": "test"},
264            {"kind": "log", "data": {"msg": "two"}, "app": "test"},
265        ]);
266
267        let req = axum::http::Request::builder()
268            .method("POST")
269            .uri("/ingest/batch")
270            .header("content-type", "application/json")
271            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
272            .unwrap();
273
274        let resp = app.oneshot(req).await.unwrap();
275        assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
276
277        let body: Value =
278            serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
279        assert_eq!(body["ok"], false);
280        assert_eq!(body["error"], "ingest channel closed");
281        assert_eq!(body["count"], 2);
282        assert_eq!(body["dropped"], 2);
283    }
284
285    #[tokio::test]
286    async fn batch_ingests_multiple() {
287        let (app, mut rx) = router();
288
289        let payload = json!([
290            {"kind": "log", "data": {"msg": "one"}, "app": "test"},
291            {"kind": "log", "data": {"msg": "two"}, "app": "test"},
292            {"kind": "exception", "data": {"message": "boom"}, "severity": "error"},
293        ]);
294
295        let req = axum::http::Request::builder()
296            .method("POST")
297            .uri("/ingest/batch")
298            .header("content-type", "application/json")
299            .body(Body::from(serde_json::to_vec(&payload).unwrap()))
300            .unwrap();
301
302        let resp = app.oneshot(req).await.unwrap();
303
304        assert_eq!(resp.status(), StatusCode::ACCEPTED);
305
306        let body: Value =
307            serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
308
309        assert_eq!(body["count"], 3);
310
311        assert!(rx.try_recv().is_ok());
312        assert!(rx.try_recv().is_ok());
313        let third = rx.try_recv().unwrap();
314        assert!(matches!(third.kind, ObservationKind::Exception { .. }));
315    }
316}