blake_streams_core/
write.rs1use crate::stream::StreamLock;
2use crate::{Head, SignedHead, Stream, StreamId};
3use anyhow::Result;
4use bao::encode::Encoder;
5use ed25519_dalek::Keypair;
6use std::fs::{File, OpenOptions};
7use std::io::{self, Cursor, Read, Write};
8use std::path::Path;
9use std::sync::Arc;
10use zerocopy::AsBytes;
11
12pub struct StreamWriter<T> {
13 db: sled::Db,
14 _lock: StreamLock,
15 file: File,
16 encoder: Encoder<Cursor<Vec<u8>>>,
17 stream: Stream,
18 key: T,
19}
20
21impl<T> StreamWriter<T> {
22 pub(crate) fn new(
23 path: &Path,
24 stream: Stream,
25 lock: StreamLock,
26 db: sled::Db,
27 key: T,
28 ) -> Result<Self> {
29 tracing::debug!("opening stream {} {}", path.display(), stream.head.head.len);
30 let mut file = OpenOptions::new()
31 .read(true)
32 .write(true)
33 .create(true)
34 .open(path)?;
35 let outboard = Vec::with_capacity(stream.outboard.len() * 2);
36 let mut encoder = Encoder::new_outboard(Cursor::new(outboard));
37 let mut pos = 0;
39 let mut buf = [0u8; 8192];
40 while pos < stream.head.head.len {
41 let npos = u64::min(pos + buf.len() as u64, stream.head.head.len);
42 let n = (npos - pos) as usize;
43 file.read_exact(&mut buf[..n])?;
44 encoder.write_all(&mut buf[..n])?;
45 pos = npos;
46 }
47 Ok(StreamWriter {
48 db,
49 _lock: lock,
50 file,
51 encoder,
52 stream,
53 key,
54 })
55 }
56
57 pub fn id(&self) -> &StreamId {
58 self.stream.head().id()
59 }
60
61 pub fn head(&self) -> &SignedHead {
62 &self.stream.head
63 }
64
65 fn finalize(&mut self) -> io::Result<()> {
66 self.flush()?;
67 let mut encoder = self.encoder.clone();
68 let hash = encoder.finalize()?;
69 let outboard = encoder.into_inner();
70 self.stream.head.head.hash = hash.into();
71 self.stream.outboard = outboard.into_inner();
72 Ok(())
73 }
74
75 fn save(&mut self) -> io::Result<()> {
76 let stream = self
77 .stream
78 .to_bytes()
79 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
80 .into_vec();
81 self.db.insert(self.id().as_bytes(), stream)?;
82 Ok(())
83 }
84}
85
86impl StreamWriter<()> {
87 pub fn commit(&mut self, sig: [u8; 64]) -> io::Result<Head> {
88 self.finalize()?;
89 self.stream
90 .head
91 .set_signature(sig)
92 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
93 self.save()?;
94 Ok(self.stream.head.head)
95 }
96}
97
98impl StreamWriter<Arc<Keypair>> {
99 pub fn commit(&mut self) -> io::Result<SignedHead> {
100 self.finalize()?;
101 self.stream.head.sign(&self.key);
102 self.save()?;
103 Ok(self.stream.head)
104 }
105}
106
107impl<T> Write for StreamWriter<T> {
108 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
109 self.file.write_all(buf)?;
110 self.encoder.write_all(buf)?;
111 self.stream.head.head.len += buf.len() as u64;
112 Ok(buf.len())
113 }
114
115 fn flush(&mut self) -> io::Result<()> {
116 self.file.flush()?;
117 self.encoder.flush()?;
118 Ok(())
119 }
120}