blake_streams_core/
store.rs1use crate::stream::StreamLock;
2use crate::{DocId, PeerId, SignedHead, Slice, Stream, StreamId, StreamReader, StreamWriter};
3use anyhow::Result;
4use bao::encode::SliceExtractor;
5use ed25519_dalek::{Keypair, PublicKey};
6use fnv::{FnvHashMap, FnvHashSet};
7use parking_lot::Mutex;
8use rkyv::{Archive, Deserialize, Infallible};
9use std::fs::File;
10use std::io::{Cursor, Read};
11use std::marker::PhantomData;
12use std::ops::Deref;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use zerocopy::AsBytes;
16
17#[derive(Clone, Debug, Eq, Hash, PartialEq)]
18struct ZeroCopy<T> {
19 _marker: PhantomData<T>,
20 ivec: sled::IVec,
21}
22
23impl<T: Archive> ZeroCopy<T> {
24 fn new(ivec: sled::IVec) -> Self {
25 Self {
26 _marker: PhantomData,
27 ivec,
28 }
29 }
30
31 fn to_inner(&self) -> T
32 where
33 T::Archived: Deserialize<T, Infallible>,
34 {
35 self.deref().deserialize(&mut Infallible).unwrap()
36 }
37}
38
39impl<T: Archive> Deref for ZeroCopy<T> {
40 type Target = T::Archived;
41
42 fn deref(&self) -> &Self::Target {
43 unsafe { rkyv::archived_root::<T>(&self.ivec[..]) }
44 }
45}
46
47impl<T> AsRef<[u8]> for ZeroCopy<T> {
48 fn as_ref(&self) -> &[u8] {
49 &self.ivec
50 }
51}
52
53impl<T> From<&ZeroCopy<T>> for sled::IVec {
54 fn from(zc: &ZeroCopy<T>) -> Self {
55 zc.ivec.clone()
56 }
57}
58
59pub struct StreamStorage {
60 db: sled::Db,
61 docs: sled::Tree,
62 streams: sled::Tree,
63 dir: PathBuf,
64 key: Arc<Keypair>,
65 locks: Arc<Mutex<FnvHashSet<StreamId>>>,
66 paths: FnvHashMap<StreamId, PathBuf>,
67}
68
69impl StreamStorage {
70 pub fn open(dir: &Path, key: Keypair) -> Result<Self> {
71 let db = sled::open(dir.join("db"))?;
72 let docs = db.open_tree("docs")?;
73 let streams = db.open_tree("streams")?;
74 let dir = dir.join("streams");
75 std::fs::create_dir_all(&dir)?;
76 Ok(Self {
77 db,
78 docs,
79 streams,
80 dir,
81 key: Arc::new(key),
82 locks: Default::default(),
83 paths: Default::default(),
84 })
85 }
86
87 pub fn public_key(&self) -> &PublicKey {
88 &self.key.public
89 }
90
91 fn get_stream(&self, id: &StreamId) -> Result<Option<ZeroCopy<Stream>>> {
92 if let Some(bytes) = self.db.get(id.as_bytes())? {
93 Ok(Some(ZeroCopy::new(bytes)))
94 } else {
95 Ok(None)
96 }
97 }
98
99 fn get_or_create_stream(&mut self, id: &StreamId) -> Result<(Stream, StreamLock)> {
100 if !self.contains_stream(&id)? {
101 let path = self.stream_path(&id);
102 std::fs::create_dir_all(path.parent().unwrap())?;
103 let stream = Stream::new(*id).to_bytes()?.into_vec();
104 self.db.insert(id.as_bytes(), &stream[..])?;
105 }
106 let lock = self.lock_stream(id.clone())?;
107 if let Some(stream) = self.get_stream(&id)? {
108 Ok((stream.to_inner(), lock))
109 } else {
110 return Err(anyhow::anyhow!("stream doesn't exist"));
111 }
112 }
113
114 fn lock_stream(&self, id: StreamId) -> Result<StreamLock> {
115 if !self.locks.lock().insert(id) {
116 return Err(anyhow::anyhow!("stream busy"));
117 }
118 Ok(StreamLock::new(id, self.locks.clone()))
119 }
120
121 fn stream_path(&mut self, id: &StreamId) -> &Path {
122 if !self.paths.contains_key(id) {
123 let path = self
124 .dir
125 .join(id.doc().to_string())
126 .join(id.peer().to_string());
127 self.paths.insert(*id, path);
128 }
129 self.paths.get(id).unwrap()
130 }
131
132 pub fn contains_doc(&self, doc: &DocId) -> Result<bool> {
133 Ok(self.docs.contains_key(doc.as_bytes())?)
134 }
135
136 pub fn docs(&self) -> impl Iterator<Item = Result<DocId>> {
137 self.docs
138 .iter()
139 .keys()
140 .map(|res| Ok(ZeroCopy::<DocId>::new(res?).to_inner()))
141 }
142
143 pub fn contains_stream(&self, id: &StreamId) -> Result<bool> {
144 Ok(self.streams.contains_key(id.as_bytes())?)
145 }
146
147 pub fn streams(&self) -> impl Iterator<Item = Result<StreamId>> {
148 self.streams
149 .iter()
150 .keys()
151 .map(|res| Ok(ZeroCopy::<StreamId>::new(res?).to_inner()))
152 }
153
154 pub fn substreams(&self, doc: DocId) -> impl Iterator<Item = Result<StreamId>> {
155 self.streams
156 .scan_prefix(doc.as_bytes())
157 .keys()
158 .map(|res| Ok(ZeroCopy::<StreamId>::new(res?).to_inner()))
159 }
160
161 pub fn head(&self, id: &StreamId) -> Result<Option<SignedHead>> {
162 if let Some(stream) = self.db.get(id.as_bytes())? {
163 let stream = ZeroCopy::<Stream>::new(stream);
164 let head = stream.head.deserialize(&mut Infallible)?;
165 return Ok(Some(head));
166 }
167 Ok(None)
168 }
169
170 pub fn append(&mut self, doc: DocId) -> Result<StreamWriter<Arc<Keypair>>> {
171 let id = StreamId::new(PeerId::from(self.key.public), doc);
172 let (stream, lock) = self.get_or_create_stream(&id)?;
173 let db = self.db.clone();
174 let key = self.key.clone();
175 let path = self.stream_path(&id);
176 Ok(StreamWriter::new(path, stream, lock, db, key)?)
177 }
178
179 pub fn subscribe(&mut self, id: &StreamId) -> Result<StreamWriter<()>> {
180 let (stream, lock) = self.get_or_create_stream(id)?;
181 let db = self.db.clone();
182 let path = self.stream_path(id);
183 Ok(StreamWriter::new(path, stream, lock, db, ())?)
184 }
185
186 pub fn remove(&mut self, id: &StreamId) -> Result<()> {
187 let _lock = self.lock_stream(id.clone())?;
188 std::fs::remove_file(self.stream_path(id))?;
191 self.db.remove(id.as_bytes())?;
192 Ok(())
193 }
194
195 pub fn slice(&mut self, id: &StreamId, start: u64, len: u64) -> Result<StreamReader> {
196 let stream = if let Some(stream) = self.get_stream(id)? {
197 stream
198 } else {
199 return Err(anyhow::anyhow!("stream doesn't exist"));
200 };
201 StreamReader::new(self.stream_path(id), &stream.head.head, start, len)
202 }
203
204 pub fn extract(
205 &mut self,
206 id: &StreamId,
207 start: u64,
208 len: u64,
209 slice: &mut Slice,
210 ) -> Result<()> {
211 slice.data.clear();
212 let stream = if let Some(stream) = self.get_stream(id)? {
213 stream
214 } else {
215 return Err(anyhow::anyhow!("stream doesn't exist"));
216 };
217 if len > stream.head.head().len() {
218 return Err(anyhow::anyhow!("trying to read after current head"));
219 }
220 let file = File::open(self.stream_path(id))?;
221 slice.head = stream.head;
222 let mut extractor =
223 SliceExtractor::new_outboard(file, Cursor::new(&stream.outboard), start, len);
224 extractor.read_to_end(&mut slice.data)?;
225 Ok(())
226 }
227}