blake_streams_core/
store.rs

1use crate::stream::StreamLock;
2use crate::{DocId, PeerId, SignedHead, Slice, Stream, StreamId, StreamReader, StreamWriter};
3use anyhow::Result;
4use bao::encode::SliceExtractor;
5use ed25519_dalek::{Keypair, PublicKey};
6use fnv::{FnvHashMap, FnvHashSet};
7use parking_lot::Mutex;
8use rkyv::{Archive, Deserialize, Infallible};
9use std::fs::File;
10use std::io::{Cursor, Read};
11use std::marker::PhantomData;
12use std::ops::Deref;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use zerocopy::AsBytes;
16
17#[derive(Clone, Debug, Eq, Hash, PartialEq)]
18struct ZeroCopy<T> {
19    _marker: PhantomData<T>,
20    ivec: sled::IVec,
21}
22
23impl<T: Archive> ZeroCopy<T> {
24    fn new(ivec: sled::IVec) -> Self {
25        Self {
26            _marker: PhantomData,
27            ivec,
28        }
29    }
30
31    fn to_inner(&self) -> T
32    where
33        T::Archived: Deserialize<T, Infallible>,
34    {
35        self.deref().deserialize(&mut Infallible).unwrap()
36    }
37}
38
39impl<T: Archive> Deref for ZeroCopy<T> {
40    type Target = T::Archived;
41
42    fn deref(&self) -> &Self::Target {
43        unsafe { rkyv::archived_root::<T>(&self.ivec[..]) }
44    }
45}
46
47impl<T> AsRef<[u8]> for ZeroCopy<T> {
48    fn as_ref(&self) -> &[u8] {
49        &self.ivec
50    }
51}
52
53impl<T> From<&ZeroCopy<T>> for sled::IVec {
54    fn from(zc: &ZeroCopy<T>) -> Self {
55        zc.ivec.clone()
56    }
57}
58
59pub struct StreamStorage {
60    db: sled::Db,
61    docs: sled::Tree,
62    streams: sled::Tree,
63    dir: PathBuf,
64    key: Arc<Keypair>,
65    locks: Arc<Mutex<FnvHashSet<StreamId>>>,
66    paths: FnvHashMap<StreamId, PathBuf>,
67}
68
69impl StreamStorage {
70    pub fn open(dir: &Path, key: Keypair) -> Result<Self> {
71        let db = sled::open(dir.join("db"))?;
72        let docs = db.open_tree("docs")?;
73        let streams = db.open_tree("streams")?;
74        let dir = dir.join("streams");
75        std::fs::create_dir_all(&dir)?;
76        Ok(Self {
77            db,
78            docs,
79            streams,
80            dir,
81            key: Arc::new(key),
82            locks: Default::default(),
83            paths: Default::default(),
84        })
85    }
86
87    pub fn public_key(&self) -> &PublicKey {
88        &self.key.public
89    }
90
91    fn get_stream(&self, id: &StreamId) -> Result<Option<ZeroCopy<Stream>>> {
92        if let Some(bytes) = self.db.get(id.as_bytes())? {
93            Ok(Some(ZeroCopy::new(bytes)))
94        } else {
95            Ok(None)
96        }
97    }
98
99    fn get_or_create_stream(&mut self, id: &StreamId) -> Result<(Stream, StreamLock)> {
100        if !self.contains_stream(&id)? {
101            let path = self.stream_path(&id);
102            std::fs::create_dir_all(path.parent().unwrap())?;
103            let stream = Stream::new(*id).to_bytes()?.into_vec();
104            self.db.insert(id.as_bytes(), &stream[..])?;
105        }
106        let lock = self.lock_stream(id.clone())?;
107        if let Some(stream) = self.get_stream(&id)? {
108            Ok((stream.to_inner(), lock))
109        } else {
110            return Err(anyhow::anyhow!("stream doesn't exist"));
111        }
112    }
113
114    fn lock_stream(&self, id: StreamId) -> Result<StreamLock> {
115        if !self.locks.lock().insert(id) {
116            return Err(anyhow::anyhow!("stream busy"));
117        }
118        Ok(StreamLock::new(id, self.locks.clone()))
119    }
120
121    fn stream_path(&mut self, id: &StreamId) -> &Path {
122        if !self.paths.contains_key(id) {
123            let path = self
124                .dir
125                .join(id.doc().to_string())
126                .join(id.peer().to_string());
127            self.paths.insert(*id, path);
128        }
129        self.paths.get(id).unwrap()
130    }
131
132    pub fn contains_doc(&self, doc: &DocId) -> Result<bool> {
133        Ok(self.docs.contains_key(doc.as_bytes())?)
134    }
135
136    pub fn docs(&self) -> impl Iterator<Item = Result<DocId>> {
137        self.docs
138            .iter()
139            .keys()
140            .map(|res| Ok(ZeroCopy::<DocId>::new(res?).to_inner()))
141    }
142
143    pub fn contains_stream(&self, id: &StreamId) -> Result<bool> {
144        Ok(self.streams.contains_key(id.as_bytes())?)
145    }
146
147    pub fn streams(&self) -> impl Iterator<Item = Result<StreamId>> {
148        self.streams
149            .iter()
150            .keys()
151            .map(|res| Ok(ZeroCopy::<StreamId>::new(res?).to_inner()))
152    }
153
154    pub fn substreams(&self, doc: DocId) -> impl Iterator<Item = Result<StreamId>> {
155        self.streams
156            .scan_prefix(doc.as_bytes())
157            .keys()
158            .map(|res| Ok(ZeroCopy::<StreamId>::new(res?).to_inner()))
159    }
160
161    pub fn head(&self, id: &StreamId) -> Result<Option<SignedHead>> {
162        if let Some(stream) = self.db.get(id.as_bytes())? {
163            let stream = ZeroCopy::<Stream>::new(stream);
164            let head = stream.head.deserialize(&mut Infallible)?;
165            return Ok(Some(head));
166        }
167        Ok(None)
168    }
169
170    pub fn append(&mut self, doc: DocId) -> Result<StreamWriter<Arc<Keypair>>> {
171        let id = StreamId::new(PeerId::from(self.key.public), doc);
172        let (stream, lock) = self.get_or_create_stream(&id)?;
173        let db = self.db.clone();
174        let key = self.key.clone();
175        let path = self.stream_path(&id);
176        Ok(StreamWriter::new(path, stream, lock, db, key)?)
177    }
178
179    pub fn subscribe(&mut self, id: &StreamId) -> Result<StreamWriter<()>> {
180        let (stream, lock) = self.get_or_create_stream(id)?;
181        let db = self.db.clone();
182        let path = self.stream_path(id);
183        Ok(StreamWriter::new(path, stream, lock, db, ())?)
184    }
185
186    pub fn remove(&mut self, id: &StreamId) -> Result<()> {
187        let _lock = self.lock_stream(id.clone())?;
188        // this is safe to do on linux as long as there are only readers.
189        // the file will be deleted after the last reader is dropped.
190        std::fs::remove_file(self.stream_path(id))?;
191        self.db.remove(id.as_bytes())?;
192        Ok(())
193    }
194
195    pub fn slice(&mut self, id: &StreamId, start: u64, len: u64) -> Result<StreamReader> {
196        let stream = if let Some(stream) = self.get_stream(id)? {
197            stream
198        } else {
199            return Err(anyhow::anyhow!("stream doesn't exist"));
200        };
201        StreamReader::new(self.stream_path(id), &stream.head.head, start, len)
202    }
203
204    pub fn extract(
205        &mut self,
206        id: &StreamId,
207        start: u64,
208        len: u64,
209        slice: &mut Slice,
210    ) -> Result<()> {
211        slice.data.clear();
212        let stream = if let Some(stream) = self.get_stream(id)? {
213            stream
214        } else {
215            return Err(anyhow::anyhow!("stream doesn't exist"));
216        };
217        if len > stream.head.head().len() {
218            return Err(anyhow::anyhow!("trying to read after current head"));
219        }
220        let file = File::open(self.stream_path(id))?;
221        slice.head = stream.head;
222        let mut extractor =
223            SliceExtractor::new_outboard(file, Cursor::new(&stream.outboard), start, len);
224        extractor.read_to_end(&mut slice.data)?;
225        Ok(())
226    }
227}