Skip to main content

infinite_db/bulk/
signal.rs

1//! Bulk signal sample import.
2
3use bincode::{config::standard, encode_to_vec};
4
5use crate::infinitedb_core::{
6    address::{DimensionVector, RevisionId, SpaceId},
7    signal::SignalSample,
8};
9
10use super::session::{BulkSessionCore, BulkWriteOptions, BulkWriteResult};
11use super::super::InfiniteDb;
12
13/// Bulk insert/delete session for [`SignalSample`] values in one space.
14pub struct BulkSignalImport<'a> {
15    session: BulkSessionCore<'a>,
16    space: SpaceId,
17}
18
19impl<'a> BulkSignalImport<'a> {
20    pub fn space(&self) -> SpaceId {
21        self.space
22    }
23
24    pub fn push(&mut self, sample: SignalSample) -> std::io::Result<RevisionId> {
25        sample.validate().map_err(|e| {
26            std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
27        })?;
28        let full_coords = sample
29            .scope
30            .address_coords(&sample.local_coords)
31            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
32        if let Some(cfg) = self.session.db.spaces.get(self.space) {
33            if cfg.dims != full_coords.len() {
34                return Err(std::io::Error::new(
35                    std::io::ErrorKind::InvalidInput,
36                    "signal coordinates do not match space dimensions",
37                ));
38            }
39        }
40        let data = encode_to_vec(&sample, standard())
41            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
42        let rev = self
43            .session
44            .push_write(self.space, DimensionVector::new(full_coords), data)?;
45        #[cfg(feature = "sync")]
46        {
47            let sample_clone = sample;
48            self.session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::WriteSignal {
49                space: self.space,
50                sample: sample_clone,
51                revision: rev,
52            });
53        }
54        Ok(rev)
55    }
56
57    /// Logical delete: tombstone at the sample's resolved address.
58    pub fn push_delete(&mut self, sample: SignalSample) -> std::io::Result<RevisionId> {
59        sample.validate().map_err(|e| {
60            std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
61        })?;
62        let full_coords = sample
63            .scope
64            .address_coords(&sample.local_coords)
65            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
66        self.session
67            .push_tombstone(self.space, DimensionVector::new(full_coords))
68    }
69
70    pub fn finish(self) -> std::io::Result<BulkWriteResult> {
71        self.session.finish()
72    }
73}
74
75impl InfiniteDb {
76    /// Begin a buffered bulk signal import into `space`.
77    pub fn begin_signal_import(&mut self, space: SpaceId) -> std::io::Result<BulkSignalImport<'_>> {
78        self.begin_signal_import_with_options(space, BulkWriteOptions::default())
79    }
80
81    pub fn begin_signal_import_with_options(
82        &mut self,
83        space: SpaceId,
84        options: BulkWriteOptions,
85    ) -> std::io::Result<BulkSignalImport<'_>> {
86        Ok(BulkSignalImport {
87            session: BulkSessionCore::begin(self, options)?,
88            space,
89        })
90    }
91
92    pub fn insert_signals_bulk<I>(
93        &mut self,
94        space: SpaceId,
95        samples: I,
96    ) -> std::io::Result<BulkWriteResult>
97    where
98        I: IntoIterator<Item = SignalSample>,
99    {
100        let mut import = self.begin_signal_import(space)?;
101        for sample in samples {
102            import.push(sample)?;
103        }
104        import.finish()
105    }
106
107    pub fn delete_signals_bulk<I>(
108        &mut self,
109        space: SpaceId,
110        samples: I,
111    ) -> std::io::Result<BulkWriteResult>
112    where
113        I: IntoIterator<Item = SignalSample>,
114    {
115        let mut import = self.begin_signal_import(space)?;
116        for sample in samples {
117            import.push_delete(sample)?;
118        }
119        import.finish()
120    }
121}