1mod error;
5pub mod normalize;
6pub mod udp;
7#[cfg(unix)]
8pub mod unix;
9
10pub use error::{IngestError, Result};
11
12use axum::{
13 Router,
14 extract::State,
15 http::StatusCode,
16 routing::{get, post},
17};
18use serde_json::{Value, json};
19use tokio::sync::mpsc::UnboundedSender;
20
21use daemon8_types::Observation;
22
23#[derive(Clone)]
24struct IngestState {
25 tx: UnboundedSender<Observation>,
26}
27
28pub fn ingest_router(tx: UnboundedSender<Observation>) -> Router {
29 let state = IngestState { tx };
30
31 Router::new()
32 .route("/ingest", post(handle_ingest))
33 .route("/ingest/batch", post(handle_batch))
34 .route("/health", get(handle_health))
35 .with_state(state)
36}
37
38async fn handle_health() -> &'static str {
39 "ok"
40}
41
42async fn handle_ingest(
43 State(state): State<IngestState>,
44 axum::Json(body): axum::Json<Value>,
45) -> (StatusCode, axum::Json<Value>) {
46 let obs = normalize::normalize(body);
47
48 tracing::debug!(
49 kind = ?obs.kind,
50 severity = ?obs.severity,
51 origin = ?obs.origin,
52 "ingested observation"
53 );
54
55 let _ = state.tx.send(obs);
56
57 (StatusCode::ACCEPTED, axum::Json(json!({"ok": true})))
58}
59
60async fn handle_batch(
61 State(state): State<IngestState>,
62 axum::Json(items): axum::Json<Vec<Value>>,
63) -> (StatusCode, axum::Json<Value>) {
64 let count = items.len();
65
66 for item in items {
67 let obs = normalize::normalize(item);
68 let _ = state.tx.send(obs);
69 }
70
71 tracing::debug!(count, "ingested batch");
72
73 (
74 StatusCode::ACCEPTED,
75 axum::Json(json!({"ok": true, "count": count})),
76 )
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use axum::body::Body;
83 use daemon8_types::{ObservationKind, Origin, Severity};
84 use http_body_util::BodyExt;
85 use tower::ServiceExt;
86
87 fn router() -> (Router, tokio::sync::mpsc::UnboundedReceiver<Observation>) {
88 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
89 (ingest_router(tx), rx)
90 }
91
92 #[tokio::test]
93 async fn health_returns_ok() {
94 let (app, _rx) = router();
95
96 let req = axum::http::Request::builder()
97 .uri("/health")
98 .body(Body::empty())
99 .unwrap();
100
101 let resp = app.oneshot(req).await.unwrap();
102
103 assert_eq!(resp.status(), StatusCode::OK);
104
105 let body = resp.into_body().collect().await.unwrap().to_bytes();
106 assert_eq!(&body[..], b"ok");
107 }
108
109 #[tokio::test]
110 async fn ingest_returns_202_and_sends_observation() {
111 let (app, mut rx) = router();
112
113 let payload = json!({
114 "kind": "exception",
115 "severity": "error",
116 "data": { "message": "boom", "trace": "at line 1" },
117 "app": "test-app",
118 "file": "/src/main.rs",
119 "line": 42,
120 });
121
122 let req = axum::http::Request::builder()
123 .method("POST")
124 .uri("/ingest")
125 .header("content-type", "application/json")
126 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
127 .unwrap();
128
129 let resp = app.oneshot(req).await.unwrap();
130 assert_eq!(resp.status(), StatusCode::ACCEPTED);
131
132 let obs = rx.try_recv().expect("should have received an observation");
133 assert!(matches!(obs.kind, ObservationKind::Exception { .. }));
134 assert!(matches!(obs.severity, Severity::Error));
135 assert!(matches!(obs.origin, Origin::Application { ref name } if name == "test-app"));
136 assert_eq!(obs.source_location.as_ref().unwrap().file, "/src/main.rs");
137 assert_eq!(obs.source_location.as_ref().unwrap().line, 42);
138 }
139
140 #[tokio::test]
141 async fn ingest_with_channel_becomes_custom() {
142 let (app, mut rx) = router();
143
144 let payload = json!({
145 "channel": "my-events",
146 "data": { "foo": "bar" },
147 });
148
149 let req = axum::http::Request::builder()
150 .method("POST")
151 .uri("/ingest")
152 .header("content-type", "application/json")
153 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
154 .unwrap();
155
156 let resp = app.oneshot(req).await.unwrap();
157
158 assert_eq!(resp.status(), StatusCode::ACCEPTED);
159
160 let obs = rx.try_recv().unwrap();
161 assert!(
162 matches!(obs.kind, ObservationKind::Custom { ref channel } if channel == "my-events")
163 );
164 }
165
166 #[tokio::test]
167 async fn ingest_bare_json_defaults_to_log() {
168 let (app, mut rx) = router();
169
170 let payload = json!({ "message": "hello world" });
171
172 let req = axum::http::Request::builder()
173 .method("POST")
174 .uri("/ingest")
175 .header("content-type", "application/json")
176 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
177 .unwrap();
178
179 let _ = app.oneshot(req).await.unwrap();
180
181 let obs = rx.try_recv().unwrap();
182 assert!(matches!(obs.kind, ObservationKind::Log));
183 assert!(matches!(obs.severity, Severity::Debug));
184 assert!(matches!(obs.origin, Origin::Application { ref name } if name == "unknown"));
185 }
186
187 #[test]
188 fn parse_severity_variants() {
189 assert!(matches!(
190 normalize::parse_severity(Some("trace")),
191 Severity::Trace
192 ));
193 assert!(matches!(
194 normalize::parse_severity(Some("INFO")),
195 Severity::Info
196 ));
197 assert!(matches!(
198 normalize::parse_severity(Some("warning")),
199 Severity::Warn
200 ));
201 assert!(matches!(
202 normalize::parse_severity(Some("garbage")),
203 Severity::Debug
204 ));
205 assert!(matches!(normalize::parse_severity(None), Severity::Debug));
206 }
207
208 #[tokio::test]
209 async fn batch_ingests_multiple() {
210 let (app, mut rx) = router();
211
212 let payload = json!([
213 {"kind": "log", "data": {"msg": "one"}, "app": "test"},
214 {"kind": "log", "data": {"msg": "two"}, "app": "test"},
215 {"kind": "exception", "data": {"message": "boom"}, "severity": "error"},
216 ]);
217
218 let req = axum::http::Request::builder()
219 .method("POST")
220 .uri("/ingest/batch")
221 .header("content-type", "application/json")
222 .body(Body::from(serde_json::to_vec(&payload).unwrap()))
223 .unwrap();
224
225 let resp = app.oneshot(req).await.unwrap();
226
227 assert_eq!(resp.status(), StatusCode::ACCEPTED);
228
229 let body: Value =
230 serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
231
232 assert_eq!(body["count"], 3);
233
234 assert!(rx.try_recv().is_ok());
235 assert!(rx.try_recv().is_ok());
236 let third = rx.try_recv().unwrap();
237 assert!(matches!(third.kind, ObservationKind::Exception { .. }));
238 }
239}