spatio_server/
writer.rs

1use spatio::Spatio;
2use spatio_types::point::Point3d;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6/// Write operation to be buffered and executed by background worker
7#[derive(Debug)]
8pub enum WriteOp {
9    Upsert {
10        namespace: String,
11        id: String,
12        point: Point3d,
13        metadata: serde_json::Value,
14    },
15    Delete {
16        namespace: String,
17        id: String,
18    },
19    InsertTrajectory {
20        namespace: String,
21        id: String,
22        trajectory: Vec<(f64, Point3d, serde_json::Value)>,
23    },
24}
25
26/// Returns the sender channel to be used by the handler
27pub fn spawn_background_writer(db: Arc<Spatio>, buffer_size: usize) -> mpsc::Sender<WriteOp> {
28    let (tx, mut rx) = mpsc::channel(buffer_size);
29
30    // Spawn a dedicated thread for writing to ensure we don't block tokio runtime
31    std::thread::spawn(move || {
32        while let Some(op) = rx.blocking_recv() {
33            match op {
34                WriteOp::Upsert {
35                    namespace,
36                    id,
37                    point,
38                    metadata,
39                } => {
40                    if let Err(e) = db.upsert(&namespace, &id, point, metadata, None) {
41                        tracing::error!("Background write failed (upsert): {}", e);
42                    }
43                }
44                WriteOp::Delete { namespace, id } => {
45                    if let Err(e) = db.delete(&namespace, &id) {
46                        tracing::error!("Background write failed (delete): {}", e);
47                    }
48                }
49                WriteOp::InsertTrajectory {
50                    namespace,
51                    id,
52                    trajectory,
53                } => {
54                    let updates: Vec<spatio::config::TemporalPoint> = trajectory
55                        .into_iter()
56                        .map(|(ts, p, _meta)| {
57                            let timestamp =
58                                std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
59                            spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
60                        })
61                        .collect();
62
63                    if let Err(e) = db.insert_trajectory(&namespace, &id, &updates) {
64                        tracing::error!("Background write failed (insert_trajectory): {}", e);
65                    }
66                }
67            }
68        }
69        tracing::info!("Background writer shutting down");
70    });
71
72    tx
73}