infinite_db/bulk/
signal.rs1use bincode::{config::standard, encode_to_vec};
4
5use crate::infinitedb_core::{
6 address::{DimensionVector, RevisionId, SpaceId},
7 signal::SignalSample,
8};
9
10use super::session::{BulkSessionCore, BulkWriteOptions, BulkWriteResult};
11use super::super::InfiniteDb;
12
13pub struct BulkSignalImport<'a> {
15 session: BulkSessionCore<'a>,
16 space: SpaceId,
17}
18
19impl<'a> BulkSignalImport<'a> {
20 pub fn space(&self) -> SpaceId {
21 self.space
22 }
23
24 pub fn push(&mut self, sample: SignalSample) -> std::io::Result<RevisionId> {
25 sample.validate().map_err(|e| {
26 std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
27 })?;
28 let full_coords = sample
29 .scope
30 .address_coords(&sample.local_coords)
31 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
32 if let Some(cfg) = self.session.db.spaces.get(self.space) {
33 if cfg.dims != full_coords.len() {
34 return Err(std::io::Error::new(
35 std::io::ErrorKind::InvalidInput,
36 "signal coordinates do not match space dimensions",
37 ));
38 }
39 }
40 let data = encode_to_vec(&sample, standard())
41 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
42 let rev = self
43 .session
44 .push_write(self.space, DimensionVector::new(full_coords), data)?;
45 #[cfg(feature = "sync")]
46 {
47 let sample_clone = sample;
48 self.session.defer_sync(crate::infinitedb_sync::transport::SyncOperation::WriteSignal {
49 space: self.space,
50 sample: sample_clone,
51 revision: rev,
52 });
53 }
54 Ok(rev)
55 }
56
57 pub fn push_delete(&mut self, sample: SignalSample) -> std::io::Result<RevisionId> {
59 sample.validate().map_err(|e| {
60 std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e))
61 })?;
62 let full_coords = sample
63 .scope
64 .address_coords(&sample.local_coords)
65 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
66 self.session
67 .push_tombstone(self.space, DimensionVector::new(full_coords))
68 }
69
70 pub fn finish(self) -> std::io::Result<BulkWriteResult> {
71 self.session.finish()
72 }
73}
74
75impl InfiniteDb {
76 pub fn begin_signal_import(&mut self, space: SpaceId) -> std::io::Result<BulkSignalImport<'_>> {
78 self.begin_signal_import_with_options(space, BulkWriteOptions::default())
79 }
80
81 pub fn begin_signal_import_with_options(
82 &mut self,
83 space: SpaceId,
84 options: BulkWriteOptions,
85 ) -> std::io::Result<BulkSignalImport<'_>> {
86 Ok(BulkSignalImport {
87 session: BulkSessionCore::begin(self, options)?,
88 space,
89 })
90 }
91
92 pub fn insert_signals_bulk<I>(
93 &mut self,
94 space: SpaceId,
95 samples: I,
96 ) -> std::io::Result<BulkWriteResult>
97 where
98 I: IntoIterator<Item = SignalSample>,
99 {
100 let mut import = self.begin_signal_import(space)?;
101 for sample in samples {
102 import.push(sample)?;
103 }
104 import.finish()
105 }
106
107 pub fn delete_signals_bulk<I>(
108 &mut self,
109 space: SpaceId,
110 samples: I,
111 ) -> std::io::Result<BulkWriteResult>
112 where
113 I: IntoIterator<Item = SignalSample>,
114 {
115 let mut import = self.begin_signal_import(space)?;
116 for sample in samples {
117 import.push_delete(sample)?;
118 }
119 import.finish()
120 }
121}