1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::stream::StreamLock;
use crate::{Head, SignedHead, Stream, StreamId};
use anyhow::Result;
use bao::encode::Encoder;
use ed25519_dalek::Keypair;
use std::fs::{File, OpenOptions};
use std::io::{self, Cursor, Read, Write};
use std::path::Path;
use std::sync::Arc;
use zerocopy::AsBytes;
pub struct StreamWriter<T> {
db: sled::Db,
_lock: StreamLock,
file: File,
encoder: Encoder<Cursor<Vec<u8>>>,
stream: Stream,
key: T,
}
impl<T> StreamWriter<T> {
pub(crate) fn new(
path: &Path,
stream: Stream,
lock: StreamLock,
db: sled::Db,
key: T,
) -> Result<Self> {
tracing::debug!("opening stream {} {}", path.display(), stream.head.head.len);
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
let outboard = Vec::with_capacity(stream.outboard.len() * 2);
let mut encoder = Encoder::new_outboard(Cursor::new(outboard));
let mut pos = 0;
let mut buf = [0u8; 8192];
while pos < stream.head.head.len {
let npos = u64::min(pos + buf.len() as u64, stream.head.head.len);
let n = (npos - pos) as usize;
file.read_exact(&mut buf[..n])?;
encoder.write_all(&mut buf[..n])?;
pos = npos;
}
Ok(StreamWriter {
db,
_lock: lock,
file,
encoder,
stream,
key,
})
}
pub fn id(&self) -> &StreamId {
self.stream.head().id()
}
pub fn head(&self) -> &SignedHead {
&self.stream.head
}
fn finalize(&mut self) -> io::Result<()> {
self.flush()?;
let mut encoder = self.encoder.clone();
let hash = encoder.finalize()?;
let outboard = encoder.into_inner();
self.stream.head.head.hash = hash.into();
self.stream.outboard = outboard.into_inner();
Ok(())
}
fn save(&mut self) -> io::Result<()> {
let stream = self
.stream
.to_bytes()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.into_vec();
self.db.insert(self.id().as_bytes(), stream)?;
Ok(())
}
}
impl StreamWriter<()> {
pub fn commit(&mut self, sig: [u8; 64]) -> io::Result<Head> {
self.finalize()?;
self.stream
.head
.set_signature(sig)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.save()?;
Ok(self.stream.head.head)
}
}
impl StreamWriter<Arc<Keypair>> {
pub fn commit(&mut self) -> io::Result<SignedHead> {
self.finalize()?;
self.stream.head.sign(&self.key);
self.save()?;
Ok(self.stream.head)
}
}
impl<T> Write for StreamWriter<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.file.write_all(buf)?;
self.encoder.write_all(buf)?;
self.stream.head.head.len += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.file.flush()?;
self.encoder.flush()?;
Ok(())
}
}