1mod error;
5pub mod normalize;
6pub mod udp;
7#[cfg(unix)]
8pub mod unix;
9
10pub use error::{IngestError, Result};
11
12use axum::{
13 Router,
14 extract::State,
15 http::StatusCode,
16 routing::{get, post},
17};
18use serde_json::{Value, json};
19use tokio::sync::mpsc::UnboundedSender;
20
21use daemon8_types::Observation;
22
23#[derive(Clone)]
24struct IngestState {
25 tx: UnboundedSender<Observation>,
26}
27
28pub fn ingest_router(tx: UnboundedSender<Observation>) -> Router {
29 let state = IngestState { tx };
30
31 Router::new()
32 .route("/ingest", post(handle_ingest))
33 .route("/ingest/batch", post(handle_batch))
34 .route("/health", get(handle_health))
35 .with_state(state)
36}
37
38async fn handle_health() -> &'static str {
39 "ok"
40}
41
42async fn handle_ingest(
43 State(state): State<IngestState>,
44 axum::Json(body): axum::Json<Value>,
45) -> (StatusCode, axum::Json<Value>) {
46 let obs = normalize::normalize(body);
47
48 tracing::debug!(
49 kind = ?obs.kind,
50 severity = ?obs.severity,
51 origin = ?obs.origin,
52 "ingested observation"
53 );
54
55 if state.tx.send(obs).is_err() {
56 return (
57 StatusCode::SERVICE_UNAVAILABLE,
58 axum::Json(json!({"ok": false, "error": "ingest channel closed"})),
59 );
60 }
61
62 (StatusCode::ACCEPTED, axum::Json(json!({"ok": true})))
63}
64
65async fn handle_batch(
66 State(state): State<IngestState>,
67 axum::Json(items): axum::Json<Vec<Value>>,
68) -> (StatusCode, axum::Json<Value>) {
69 let count = items.len();
70 let mut dropped = 0usize;
71
72 for item in items {
73 let obs = normalize::normalize(item);
74 if state.tx.send(obs).is_err() {
75 dropped += 1;
76 }
77 }
78
79 tracing::debug!(count, dropped, "ingested batch");
80
81 if dropped > 0 {
82 return (
83 StatusCode::SERVICE_UNAVAILABLE,
84 axum::Json(json!({
85 "ok": false,
86 "error": "ingest channel closed",
87 "count": count,
88 "dropped": dropped,
89 })),
90 );
91 }
92
93 (
94 StatusCode::ACCEPTED,
95 axum::Json(json!({"ok": true, "count": count})),
96 )
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use axum::body::Body;
103 use daemon8_types::{ObservationKind, Origin, Severity};
104 use http_body_util::BodyExt;
105 use tower::ServiceExt;
106
107 fn router() -> (Router, tokio::sync::mpsc::UnboundedReceiver<Observation>) {
108 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
109 (ingest_router(tx), rx)
110 }
111
112 #[tokio::test]
113 async fn health_returns_ok() {
114 let (app, _rx) = router();
115
116 let req = axum::http::Request::builder()
117 .uri("/health")
118 .body(Body::empty())
119 .unwrap();
120
121 let resp = app.oneshot(req).await.unwrap();
122
123 assert_eq!(resp.status(), StatusCode::OK);
124
125 let body = resp.into_body().collect().await.unwrap().to_bytes();
126 assert_eq!(&body[..], b"ok");
127 }
128
129 #[tokio::test]
130 async fn ingest_returns_202_and_sends_observation() {
131 let (app, mut rx) = router();
132
133 let payload = json!({
134 "kind": "exception",
135 "severity": "error",
136 "data": { "message": "boom", "trace": "at line 1" },
137 "app": "test-app",
138 "file": "/src/main.rs",
139 "line": 42,
140 });
141
142 let req = axum::http::Request::builder()
143 .method("POST")
144 .uri("/ingest")
145 .header("content-type", "application/json")
146 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
147 .unwrap();
148
149 let resp = app.oneshot(req).await.unwrap();
150 assert_eq!(resp.status(), StatusCode::ACCEPTED);
151
152 let obs = rx.try_recv().expect("should have received an observation");
153 assert!(matches!(obs.kind, ObservationKind::Exception { .. }));
154 assert!(matches!(obs.severity, Severity::Error));
155 assert!(matches!(obs.origin, Origin::Application { ref name } if name == "test-app"));
156 assert_eq!(obs.source_location.as_ref().unwrap().file, "/src/main.rs");
157 assert_eq!(obs.source_location.as_ref().unwrap().line, 42);
158 }
159
160 #[tokio::test]
161 async fn ingest_with_channel_becomes_custom() {
162 let (app, mut rx) = router();
163
164 let payload = json!({
165 "channel": "my-events",
166 "data": { "foo": "bar" },
167 });
168
169 let req = axum::http::Request::builder()
170 .method("POST")
171 .uri("/ingest")
172 .header("content-type", "application/json")
173 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
174 .unwrap();
175
176 let resp = app.oneshot(req).await.unwrap();
177
178 assert_eq!(resp.status(), StatusCode::ACCEPTED);
179
180 let obs = rx.try_recv().unwrap();
181 assert!(
182 matches!(obs.kind, ObservationKind::Custom { ref channel } if channel == "my-events")
183 );
184 }
185
186 #[tokio::test]
187 async fn ingest_bare_json_defaults_to_log() {
188 let (app, mut rx) = router();
189
190 let payload = json!({ "message": "hello world" });
191
192 let req = axum::http::Request::builder()
193 .method("POST")
194 .uri("/ingest")
195 .header("content-type", "application/json")
196 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
197 .unwrap();
198
199 let _ = app.oneshot(req).await.unwrap();
200
201 let obs = rx.try_recv().unwrap();
202 assert!(matches!(obs.kind, ObservationKind::Log));
203 assert!(matches!(obs.severity, Severity::Debug));
204 assert!(matches!(obs.origin, Origin::Application { ref name } if name == "unknown"));
205 }
206
207 #[test]
208 fn parse_severity_variants() {
209 assert!(matches!(
210 normalize::parse_severity(Some("trace")),
211 Severity::Trace
212 ));
213 assert!(matches!(
214 normalize::parse_severity(Some("INFO")),
215 Severity::Info
216 ));
217 assert!(matches!(
218 normalize::parse_severity(Some("warning")),
219 Severity::Warn
220 ));
221 assert!(matches!(
222 normalize::parse_severity(Some("garbage")),
223 Severity::Debug
224 ));
225 assert!(matches!(normalize::parse_severity(None), Severity::Debug));
226 }
227
228 #[tokio::test]
229 async fn ingest_returns_503_when_channel_closed() {
230 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Observation>();
231 let app = ingest_router(tx);
232 drop(rx);
233
234 let payload = json!({
235 "kind": "log",
236 "data": { "msg": "shutdown probe" },
237 "app": "test-app",
238 });
239
240 let req = axum::http::Request::builder()
241 .method("POST")
242 .uri("/ingest")
243 .header("content-type", "application/json")
244 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
245 .unwrap();
246
247 let resp = app.oneshot(req).await.unwrap();
248 assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
249
250 let body: Value =
251 serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
252 assert_eq!(body["ok"], false);
253 assert_eq!(body["error"], "ingest channel closed");
254 }
255
256 #[tokio::test]
257 async fn batch_returns_503_when_channel_closed() {
258 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Observation>();
259 let app = ingest_router(tx);
260 drop(rx);
261
262 let payload = json!([
263 {"kind": "log", "data": {"msg": "one"}, "app": "test"},
264 {"kind": "log", "data": {"msg": "two"}, "app": "test"},
265 ]);
266
267 let req = axum::http::Request::builder()
268 .method("POST")
269 .uri("/ingest/batch")
270 .header("content-type", "application/json")
271 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
272 .unwrap();
273
274 let resp = app.oneshot(req).await.unwrap();
275 assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
276
277 let body: Value =
278 serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
279 assert_eq!(body["ok"], false);
280 assert_eq!(body["error"], "ingest channel closed");
281 assert_eq!(body["count"], 2);
282 assert_eq!(body["dropped"], 2);
283 }
284
285 #[tokio::test]
286 async fn batch_ingests_multiple() {
287 let (app, mut rx) = router();
288
289 let payload = json!([
290 {"kind": "log", "data": {"msg": "one"}, "app": "test"},
291 {"kind": "log", "data": {"msg": "two"}, "app": "test"},
292 {"kind": "exception", "data": {"message": "boom"}, "severity": "error"},
293 ]);
294
295 let req = axum::http::Request::builder()
296 .method("POST")
297 .uri("/ingest/batch")
298 .header("content-type", "application/json")
299 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
300 .unwrap();
301
302 let resp = app.oneshot(req).await.unwrap();
303
304 assert_eq!(resp.status(), StatusCode::ACCEPTED);
305
306 let body: Value =
307 serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
308
309 assert_eq!(body["count"], 3);
310
311 assert!(rx.try_recv().is_ok());
312 assert!(rx.try_recv().is_ok());
313 let third = rx.try_recv().unwrap();
314 assert!(matches!(third.kind, ObservationKind::Exception { .. }));
315 }
316}