telemetry-server 0.1.0

Simple receiver of telemetry over HTTP POST/WS to sqlite3, Postgres, DuckDB or JSON files
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
mod conn;
mod stream_id;
#[cfg(test)]
mod tests;

use conn::*;
use stream_id::StreamId;
use Error::*;

use anyhow::{anyhow, Context, Result};
use axum::body::Bytes;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::WebSocketUpgrade;
use axum::http::{HeaderMap, StatusCode};
use clap::Parser;
use futures::FutureExt;
use futures::{future, select_biased, TryFutureExt};
use futures::{Stream, StreamExt};
use std::fmt::{Debug, Display, Formatter};
use std::future::{poll_fn, Future, IntoFuture};
use std::io::Write;
use std::pin::pin;
use std::sync::Arc;
use std::task::Poll;
use tokio::signal::ctrl_c;
use tokio::signal::unix::SignalKind;
use tokio::sync::Mutex;
use tracing::*;

#[derive(clap::Parser)]
struct Args {
    #[command(subcommand)]
    storage: Storage,
}

#[derive(Clone, clap::Subcommand)]
enum Storage {
    #[cfg(feature = "local")]
    Sqlite(local::SqliteOpen),
    #[cfg(feature = "local")]
    DuckDB(local::DuckDbOpen),
    #[cfg(feature = "local")]
    JsonFiles(local::JsonFilesOpen),
    #[cfg(feature = "postgres")]
    Postgres(postgres::PostgresOpener),
}

impl Storage {
    pub(crate) async fn open(self) -> Result<Box<dyn Connection + Send>> {
        // Moving the box/dyn stuff into the trait doesn't seem better. Rust doesn't let me dispatch
        // over enums that all implement the same trait?
        match self {
            #[cfg(feature = "local")]
            Storage::Sqlite(open) => Self::do_open(open).await,
            #[cfg(feature = "local")]
            Storage::DuckDB(open) => Self::do_open(open).await,
            #[cfg(feature = "local")]
            Storage::JsonFiles(open) => Self::do_open(open).await,
            #[cfg(feature = "postgres")]
            Storage::Postgres(open) => Self::do_open(open).await,
        }
    }

    async fn do_open<O>(opener: O) -> Result<Box<dyn Connection + Send>>
    where
        O: StorageOpen,
        <O as StorageOpen>::Conn: Connection + 'static,
    {
        Ok(Box::new(opener.open().await?))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::Builder::from_default_env()
        .format(|fmt, record| {
            let level_style = fmt.default_level_style(record.level());
            let localtime = chrono::Local::now();
            writeln!(
                fmt,
                "[{} {level_style}{}{level_style:#} {}] {}",
                localtime,
                record.level(),
                record.target(),
                record.args()
            )
        })
        .init();
    debug!(test_arg = "hi mum", "debug level test");
    let args = Args::parse();
    let db_conn = args.storage.open().await?;
    let commit_on_sigint = db_conn.commit_on_sigint();
    let db_conn = Arc::new(Mutex::new(db_conn));

    // This catches signals that trigger commit. Spin it up even if not committing on sigint to
    // ensure all behaviours are handled correctly.
    tokio::spawn({
        let db_conn = db_conn.clone();
        async move {
            if !db_conn.lock().await.commit_on_sigint() {
                std::future::pending::<()>().await;
                return;
            }
            loop {
                ctrl_c().await.unwrap();
                log_commit(&mut **db_conn.lock().await).unwrap();
            }
        }
    });

    let server = Arc::new(Server { db_conn });
    // TODO: Catch a signal or handle an endpoint that triggers the db conn to be committed. Also do
    // this on a timer.
    let tower_layer = tower_http::trace::TraceLayer::new_for_http()
        .make_span_with(tower_http::trace::DefaultMakeSpan::new().include_headers(true))
        .on_request(())
        .on_body_chunk(());
    let app = axum::Router::new()
        .route(
            "/",
            axum::routing::post({
                let server = Arc::clone(&server);
                move |body| async move { server.post_handler(body).await }
            }),
        )
        .route(
            "/",
            axum::routing::get({
                let server = Arc::clone(&server);
                |ws_upgrade: WebSocketUpgrade, headers: HeaderMap| async move {
                    ws_upgrade.on_upgrade(move |ws| async move {
                        server.websocket_handler(ws, &headers).await
                    })
                }
            }),
        )
        .layer(tower_layer);
    // This is just the OTLP/HTTP port, because if we're using this we're probably not using OTLP. I
    // want this to bind dual stack, but I don't see any obvious way to do it with one call.
    let listener = tokio::net::TcpListener::bind("[::]:4318").await?;
    let listener_local_addr = listener.local_addr()?;
    info!(?listener_local_addr, "serving http");
    let http_server = axum::serve(listener, app)
        .into_future()
        .map_err(anyhow::Error::from);
    let term_sigs = pin!(handle_main_signals(commit_on_sigint)?);
    let either = future::select(http_server, term_sigs).await;
    either.factor_first().0
}

fn handle_main_signals(commit_on_sigint: bool) -> Result<impl Future<Output = Result<()>>> {
    let mut signals = vec![];
    if !commit_on_sigint {
        signals.push(Box::pin(signal("SIGINT", SignalKind::interrupt())?));
    }
    for (name, kind) in [
        // On Windows we probably wouldn't have this one. Also what about the fact this is normally
        // for user detected errors?
        ("SIGQUIT", SignalKind::quit()),
        ("SIGTERM", SignalKind::terminate()),
    ] {
        signals.push(Box::pin(signal(name, kind)?));
    }
    Ok(async move {
        let signal_name = future::select_all(signals).await.0 .0;
        warn!(signal_name, "received terminating main signal");
        Ok(())
    })
}

fn signal(name: &str, kind: SignalKind) -> Result<impl Future<Output = (&str, Option<()>)>> {
    let mut signal = tokio::signal::unix::signal(kind)?;
    Ok(async move { signal.recv().map(|maybe_sig| (name, maybe_sig)).await })
}

fn log_commit(conn: &mut (impl Connection + ?Sized)) -> Result<()> {
    let res = conn.commit();
    match &res {
        Ok(()) => info!("committed"),
        Err(err) => error!(%err, "committing"),
    };
    res
}

type SerializedHeaders = serde_json::Value;

type StreamEventIndex = u64;

struct Server {
    db_conn: Arc<Mutex<Box<dyn Connection + Send>>>,
}

async fn iter_json_stream<F>(
    mut body_data_stream: impl Stream<Item = Result<Bytes, axum::Error>> + Unpin,
    mut on_payload: impl FnMut(Vec<u8>) -> F,
) -> Result<(), (anyhow::Error, StatusCode)>
where
    F: Future<Output = Result<()>>,
{
    let mut bytes = vec![];
    let mut last_eof_error = None;
    while let Some(result) = body_data_stream.next().await {
        let new_bytes = match result {
            Err(err) => {
                return Err((
                    anyhow::Error::from(err).context("error in body data stream"),
                    StatusCode::INTERNAL_SERVER_ERROR,
                ));
            }
            Ok(ok) => ok,
        };
        bytes.extend_from_slice(&new_bytes);
        // Iterate through JSON objects without allocating anything.
        let mut json_stream_deserializer = serde_json::Deserializer::from_slice(bytes.as_slice())
            .into_iter::<serde::de::IgnoredAny>();
        let mut last_offset = 0;
        while let Some(result) = json_stream_deserializer.next() {
            match result {
                Err(err) if err.is_eof() => {
                    last_eof_error = Some(err);
                    break;
                }
                Err(err) => {
                    error!(?err, "error deserializing json value");
                    return Err((
                        anyhow!(err).context("deserializing json value"),
                        StatusCode::BAD_REQUEST,
                    ));
                }
                Ok(serde::de::IgnoredAny) => {
                    last_eof_error = None;
                    let value_end_offset = json_stream_deserializer.byte_offset();
                    let payload = bytes[last_offset..value_end_offset].to_vec();
                    if let Err(err) = on_payload(payload).await {
                        return Err((
                            err.context("handling payload"),
                            StatusCode::INTERNAL_SERVER_ERROR,
                        ));
                    }
                    last_offset = value_end_offset;
                }
            }
        }
        trace!(last_offset, "draining bytes to offset");
        bytes.drain(..last_offset);
    }
    last_eof_error
        .map(|eof_err| Err((anyhow!(eof_err), StatusCode::BAD_REQUEST)))
        .unwrap_or(Ok(()))
}

enum StreamRetry {
    More,
    Stop,
}

#[derive(Debug)]
enum Error {
    Recv(axum::Error),
    Handle(anyhow::Error),
}

impl Display for Error {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            Recv(err) => Display::fmt(err, f),
            Handle(err) => Display::fmt(err, f),
        }
    }
}

impl Server {
    async fn websocket_handler(&self, websocket: WebSocket, headers: &HeaderMap) {
        if let Err(err) = self.websocket_handler_err(websocket, headers).await {
            match err {
                Recv(err) => {
                    debug!(?err, "receiving message");
                }
                Handle(err) => {
                    error!(?err, "handling message");
                }
            }
        }
    }

    async fn handle_message(
        &self,
        message: Message,
        stream_id: StreamId,
        stream_event_index: &mut StreamEventIndex,
    ) -> Result<StreamRetry> {
        match message {
            Message::Close(reason) => {
                debug!(%stream_id, ?reason, "websocket closed");
                // Not sure if we should act on this or let the next recv return None?
                Ok(StreamRetry::More)
            }
            // Pings and pongs are apparently are handled for us by the library.
            Message::Ping(_) | Message::Pong(_) => Ok(StreamRetry::More),
            // That should leave text and binary types, which we won't discriminate.
            Message::Binary(vec) if vec.is_empty() => Ok(StreamRetry::Stop),
            _ => {
                let payload = message.to_text().context("converting payload to text")?;
                self.insert_event(payload, stream_id, *stream_event_index)
                    .await
                    .context("inserting event")?;
                Ok(StreamRetry::More)
            }
        }
    }

    /// Only returns receive errors. Logs acknowledgement errors (but still returns).
    async fn websocket_handler_err(
        &self,
        mut websocket: WebSocket,
        headers: &HeaderMap,
    ) -> Result<(), Error> {
        let stream_id = self
            .new_stream(headers)
            .await
            .context("creating new stream")
            .map_err(Handle)?;
        // TODO: Flush streams
        let mut total_events = 0;
        let mut stream_event_index = 0;
        let result = loop {
            let (batch_count, last_recv_result) = Self::receive_consecutive_websocket_messages(
                &mut websocket,
                |message| async move {
                    // TODO: Take db_conn lock on first event.
                    self.handle_message(message, stream_id, &mut stream_event_index)
                        .await
                },
            )
            .await;
            info!(batch_count, %stream_id, "inserted consecutive payloads");
            if batch_count != 0 {
                // Just flush the events.
                self.db_conn
                    .lock()
                    .await
                    .flush()
                    .await
                    .context("flushing consecutive payloads")
                    .map_err(Handle)?;
                total_events += batch_count;
                if let Err(err) = Self::acknowledge_inserted(&mut websocket, total_events).await {
                    // Report the acknowledgment error, which is pretty important, and return with
                    // whatever the recv result was.
                    error!(?err, "acknowledging received");
                    break last_recv_result.map(|_| ());
                }
            }
            match last_recv_result {
                Err(err) => {
                    break Err(err);
                }
                Ok(StreamRetry::Stop) => {
                    break Ok(());
                }
                Ok(StreamRetry::More) => {}
            }
        };
        match &result {
            Ok(()) => {
                info!(%stream_id, total_events, "stream ended");
            }
            Err(err) => {
                info!(%stream_id, total_events, %err, "stream ended");
            }
        }
        result
    }

    async fn receive_consecutive_websocket_messages<F>(
        websocket: &mut WebSocket,
        mut handle: impl FnMut(Message) -> F,
    ) -> (u64, Result<StreamRetry, Error>)
    where
        F: Future<Output = Result<StreamRetry>>,
    {
        let mut count = 0;
        let result = loop {
            let mut nonblocking = poll_fn(|_cx| {
                if count == 0 {
                    Poll::Pending
                } else {
                    Poll::Ready(())
                }
            })
            .fuse();
            let option_recv = select_biased! {
                a = websocket.recv().fuse() => a,
                () = nonblocking => {
                    assert!(count > 0);
                    break Ok(StreamRetry::More);
                }
            };
            match match option_recv {
                Some(Ok(message)) => match handle(message).await {
                    Ok(more) => {
                        count += 1;
                        more
                    }
                    Err(err) => break Err(Handle(err)),
                },
                Some(Err(err)) => {
                    break Err(Recv(err));
                }
                None => break Ok(StreamRetry::Stop),
            } {
                StreamRetry::More => {}
                StreamRetry::Stop => break Ok(StreamRetry::Stop),
            }
        };
        (count, result)
    }

    async fn acknowledge_inserted(
        websocket: &mut WebSocket,
        counter: u64,
    ) -> Result<(), axum::Error> {
        websocket.send(Message::Text(counter.to_string())).await
    }

    // Eventually this might return a list of items added, or a count, so that callers can throw
    // away what they know was committed.
    async fn post_handler(
        &self,
        req: axum::http::Request<axum::body::Body>,
    ) -> (StatusCode, String) {
        let mut payloads_inserted = 0;
        let status_code = self
            .post_handler_status_code(req, &mut payloads_inserted)
            .await;
        info!(payloads_inserted, "submit handled ok");
        (status_code, format!("{}", payloads_inserted))
    }

    async fn new_stream(&self, headers: &HeaderMap) -> anyhow::Result<StreamId> {
        let headers_value = headers_to_json_value(headers)?;
        let mut conn = self.db_conn.lock().await;
        let stream_id = conn.new_stream(headers_value).await?;
        info!(%stream_id, "started new stream");
        Ok(stream_id)
    }

    async fn insert_event(
        &self,
        payload: &str,
        stream_id: StreamId,
        stream_event_index: StreamEventIndex,
    ) -> Result<()> {
        // Down the track this could be done in a separate thread, or under a transaction each time
        // we read a chunk.
        debug!(payload, "inserting payload into store");
        let mut conn = self.db_conn.lock().await;
        conn.insert_event(stream_id, stream_event_index, payload)
            .await
            .context("inserting payload into store")
    }

    async fn post_handler_status_code(
        &self,
        req: axum::http::Request<axum::body::Body>,
        payloads_inserted: &mut u64,
    ) -> StatusCode {
        let stream_id = match self.new_stream(req.headers()).await {
            Err(err) => {
                error!(?err, "creating new stream");
                return StatusCode::INTERNAL_SERVER_ERROR;
            }
            Ok(ok) => ok,
        };
        let body_data_stream = req.into_body().into_data_stream();
        let mut stream_event_index = 0;
        let result = iter_json_stream(body_data_stream, move |payload| {
            *payloads_inserted += 1;
            stream_event_index += 1;
            assert_eq!(stream_event_index, *payloads_inserted);
            async move {
                // sqlite needs to be given text.
                let payload = std::str::from_utf8(&payload).unwrap();
                self.insert_event(payload, stream_id, stream_event_index)
                    .await?;
                Ok(())
            }
        })
        .await;
        match result {
            Ok(()) => {}
            Err((err, code)) => {
                error!(?err, "error while iterating json stream");
                return code;
            }
        }
        StatusCode::OK
    }
}

fn headers_to_json_value(headers: &HeaderMap) -> serde_json::Result<serde_json::Value> {
    // This converts duplicate header values to an array, and seems to leave single header values
    // alone. This is needed to fix JSON containing backslashes for some values when those should be
    // valid objects.
    http_serde::header_map::serialize(headers, serde_json::value::Serializer)
}