blake_streams_core/
write.rs

1use crate::stream::StreamLock;
2use crate::{Head, SignedHead, Stream, StreamId};
3use anyhow::Result;
4use bao::encode::Encoder;
5use ed25519_dalek::Keypair;
6use std::fs::{File, OpenOptions};
7use std::io::{self, Cursor, Read, Write};
8use std::path::Path;
9use std::sync::Arc;
10use zerocopy::AsBytes;
11
12pub struct StreamWriter<T> {
13    db: sled::Db,
14    _lock: StreamLock,
15    file: File,
16    encoder: Encoder<Cursor<Vec<u8>>>,
17    stream: Stream,
18    key: T,
19}
20
21impl<T> StreamWriter<T> {
22    pub(crate) fn new(
23        path: &Path,
24        stream: Stream,
25        lock: StreamLock,
26        db: sled::Db,
27        key: T,
28    ) -> Result<Self> {
29        tracing::debug!("opening stream {} {}", path.display(), stream.head.head.len);
30        let mut file = OpenOptions::new()
31            .read(true)
32            .write(true)
33            .create(true)
34            .open(path)?;
35        let outboard = Vec::with_capacity(stream.outboard.len() * 2);
36        let mut encoder = Encoder::new_outboard(Cursor::new(outboard));
37        // TODO: this can probably be optimized: discuss with bao author.
38        let mut pos = 0;
39        let mut buf = [0u8; 8192];
40        while pos < stream.head.head.len {
41            let npos = u64::min(pos + buf.len() as u64, stream.head.head.len);
42            let n = (npos - pos) as usize;
43            file.read_exact(&mut buf[..n])?;
44            encoder.write_all(&mut buf[..n])?;
45            pos = npos;
46        }
47        Ok(StreamWriter {
48            db,
49            _lock: lock,
50            file,
51            encoder,
52            stream,
53            key,
54        })
55    }
56
57    pub fn id(&self) -> &StreamId {
58        self.stream.head().id()
59    }
60
61    pub fn head(&self) -> &SignedHead {
62        &self.stream.head
63    }
64
65    fn finalize(&mut self) -> io::Result<()> {
66        self.flush()?;
67        let mut encoder = self.encoder.clone();
68        let hash = encoder.finalize()?;
69        let outboard = encoder.into_inner();
70        self.stream.head.head.hash = hash.into();
71        self.stream.outboard = outboard.into_inner();
72        Ok(())
73    }
74
75    fn save(&mut self) -> io::Result<()> {
76        let stream = self
77            .stream
78            .to_bytes()
79            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
80            .into_vec();
81        self.db.insert(self.id().as_bytes(), stream)?;
82        Ok(())
83    }
84}
85
86impl StreamWriter<()> {
87    pub fn commit(&mut self, sig: [u8; 64]) -> io::Result<Head> {
88        self.finalize()?;
89        self.stream
90            .head
91            .set_signature(sig)
92            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
93        self.save()?;
94        Ok(self.stream.head.head)
95    }
96}
97
98impl StreamWriter<Arc<Keypair>> {
99    pub fn commit(&mut self) -> io::Result<SignedHead> {
100        self.finalize()?;
101        self.stream.head.sign(&self.key);
102        self.save()?;
103        Ok(self.stream.head)
104    }
105}
106
107impl<T> Write for StreamWriter<T> {
108    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
109        self.file.write_all(buf)?;
110        self.encoder.write_all(buf)?;
111        self.stream.head.head.len += buf.len() as u64;
112        Ok(buf.len())
113    }
114
115    fn flush(&mut self) -> io::Result<()> {
116        self.file.flush()?;
117        self.encoder.flush()?;
118        Ok(())
119    }
120}