Skip to main content

ordinary_utils/
events.rs

1// Copyright (C) 2026 Ordinary Labs, LLC.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4
5use crate::json::JsonValuable;
6use async_compression::tokio::write::ZlibDecoder;
7use axum::Router;
8use axum::http::StatusCode;
9use axum::routing::post;
10use bytes::Bytes;
11use serde::{Deserialize, Serialize};
12use std::collections::BTreeMap;
13use tokio::io::AsyncWriteExt;
14
15#[derive(Serialize, Deserialize, Debug)]
16pub struct ClientEvent {
17    /// timestamp
18    pub ts: String,
19    /// timezone
20    pub tz: String,
21    /// log level
22    #[serde(skip_serializing)]
23    pub lvl: String,
24    /// version
25    pub v: String,
26    /// correlation id
27    pub cid: String,
28    /// memory id
29    pub mid: String,
30    /// path
31    pub p: String,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    #[serde(default)]
34    /// query
35    pub q: Option<String>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    #[serde(default)]
38    /// fragment
39    pub f: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    #[serde(default)]
42    /// flags
43    pub flg: Option<BTreeMap<String, String>>,
44    #[serde(skip_serializing_if = "Option::is_none")]
45    #[serde(default)]
46    /// fields
47    pub fld: Option<BTreeMap<String, String>>,
48    #[serde(skip_serializing)]
49    /// message
50    pub msg: String,
51}
52
53#[must_use]
54#[allow(clippy::ref_option)]
55pub fn setup_routes<S>(client_events: &Option<bool>) -> Option<Router<S>>
56where
57    S: Clone + Send + Sync + 'static,
58{
59    if client_events == &Some(true) {
60        let router = Router::new().route(
61            "/.ordinary/v1/events",
62            post(|body: Bytes| async move {
63                let mut decoder = ZlibDecoder::new(Vec::new());
64                if let Err(err) = decoder.write_all(body.as_ref()).await {
65                    tracing::error!(%err);
66                    return StatusCode::INTERNAL_SERVER_ERROR;
67                }
68                if let Err(err) = decoder.shutdown().await {
69                    tracing::error!(%err);
70                    return StatusCode::INTERNAL_SERVER_ERROR;
71                }
72
73                let mut events_slice = decoder.into_inner();
74
75                let events: Vec<ClientEvent> = match simd_json::from_slice(&mut events_slice) {
76                    Ok(parsed) => parsed,
77                    Err(err) => {
78                        tracing::error!(%err);
79                        return StatusCode::INTERNAL_SERVER_ERROR;
80                    }
81                };
82
83                let client_span = tracing::info_span!("client");
84
85                client_span.in_scope(|| {
86                    for event in events {
87                        #[cfg(tracing_unstable)]
88                        {
89                            let level = event.lvl.as_str();
90                            let message = event.msg.as_str();
91
92                            if let Ok(value) = serde_json::to_value(&event) {
93                                let event = JsonValuable(value);
94
95                                match level {
96                                    "TRACE" => tracing::trace!(
97                                        evt = tracing::field::valuable(&event),
98                                        "{message}"
99                                    ),
100                                    "DEBUG" => tracing::debug!(
101                                        evt = tracing::field::valuable(&event),
102                                        "{message}"
103                                    ),
104                                    "INFO" => {
105                                        tracing::info!(
106                                            evt = tracing::field::valuable(&event),
107                                            "{message}"
108                                        );
109                                    }
110                                    "WARN" => {
111                                        tracing::warn!(
112                                            evt = tracing::field::valuable(&event),
113                                            "{message}"
114                                        );
115                                    }
116                                    _ => tracing::error!(
117                                        evt = tracing::field::valuable(&event),
118                                        "{message}"
119                                    ),
120                                }
121                            }
122                        }
123
124                        #[cfg(not(tracing_unstable))]
125                        {
126                            match event.lvl.as_ref() {
127                                "TRACE" => tracing::trace!(evt = debug(&event)),
128                                "DEBUG" => tracing::debug!(evt = debug(&event)),
129                                "INFO" => tracing::info!(evt = debug(&event)),
130                                "WARN" => tracing::warn!(evt = debug(&event)),
131                                _ => tracing::error!(evt = debug(&event)),
132                            }
133                        }
134                    }
135                });
136
137                StatusCode::OK
138            }),
139        );
140        return Some(router);
141    }
142
143    None
144}