1use spatio::Spatio;
2use spatio_types::point::Point3d;
3use std::sync::Arc;
4use tokio::sync::mpsc;
5
6#[derive(Debug)]
8pub enum WriteOp {
9 Upsert {
10 namespace: String,
11 id: String,
12 point: Point3d,
13 metadata: serde_json::Value,
14 },
15 Delete {
16 namespace: String,
17 id: String,
18 },
19 InsertTrajectory {
20 namespace: String,
21 id: String,
22 trajectory: Vec<(f64, Point3d, serde_json::Value)>,
23 },
24}
25
26pub fn spawn_background_writer(db: Arc<Spatio>, buffer_size: usize) -> mpsc::Sender<WriteOp> {
28 let (tx, mut rx) = mpsc::channel(buffer_size);
29
30 std::thread::spawn(move || {
32 while let Some(op) = rx.blocking_recv() {
33 match op {
34 WriteOp::Upsert {
35 namespace,
36 id,
37 point,
38 metadata,
39 } => {
40 if let Err(e) = db.upsert(&namespace, &id, point, metadata, None) {
41 tracing::error!("Background write failed (upsert): {}", e);
42 }
43 }
44 WriteOp::Delete { namespace, id } => {
45 if let Err(e) = db.delete(&namespace, &id) {
46 tracing::error!("Background write failed (delete): {}", e);
47 }
48 }
49 WriteOp::InsertTrajectory {
50 namespace,
51 id,
52 trajectory,
53 } => {
54 let updates: Vec<spatio::config::TemporalPoint> = trajectory
55 .into_iter()
56 .map(|(ts, p, _meta)| {
57 let timestamp =
58 std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
59 spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
60 })
61 .collect();
62
63 if let Err(e) = db.insert_trajectory(&namespace, &id, &updates) {
64 tracing::error!("Background write failed (insert_trajectory): {}", e);
65 }
66 }
67 }
68 }
69 tracing::info!("Background writer shutting down");
70 });
71
72 tx
73}