1use anyhow::Result;
2use futures::channel::mpsc;
3use futures::prelude::*;
4use ipfs_embed::{Event, GossipEvent, LocalStreamWriter, SignedHead};
5use std::io::{self, Write};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use zerocopy::{AsBytes, LayoutVerified};
9
10pub type Ipfs = ipfs_embed::Ipfs<ipfs_embed::DefaultParams>;
11pub use blake_streams_core::PeerId;
12pub use ipfs_embed::{self, DocId, Head, StreamId, StreamReader, SwarmEvents};
13
14pub struct BlakeStreams {
15 ipfs: Ipfs,
16}
17
18impl BlakeStreams {
19 pub fn new(ipfs: Ipfs) -> Self {
20 Self { ipfs }
21 }
22
23 pub fn ipfs(&self) -> &Ipfs {
24 &self.ipfs
25 }
26
27 pub fn streams(&self) -> Result<Vec<StreamId>> {
28 self.ipfs.streams()
29 }
30
31 pub fn head(&self, id: &StreamId) -> Result<Option<SignedHead>> {
32 self.ipfs.stream_head(id)
33 }
34
35 pub fn slice(&self, id: &StreamId, start: u64, len: u64) -> Result<StreamReader> {
36 self.ipfs.stream_slice(id, start, len)
37 }
38
39 pub fn remove(&self, id: &StreamId) -> Result<()> {
40 self.ipfs.stream_remove(id)
41 }
42
43 pub fn link_stream(&self, id: &StreamId) -> Result<()> {
44 tracing::info!("{}: linking stream (peer {})", id.doc(), id.peer());
45 self.ipfs.stream_subscribe(id)
46 }
47
48 pub async fn subscribe(&self, doc: DocId) -> Result<DocStream> {
49 tracing::info!("{}: subscribe", doc);
50 let events = self.ipfs.swarm_events();
51 let (mut tx, rx) = mpsc::channel(1);
52 let _ = tx.try_send(());
53 let ipfs = self.ipfs.clone();
54 async_global_executor::spawn(doc_task(
55 doc,
56 ipfs.swarm_events(),
57 ipfs.subscribe(&doc.to_string())?,
58 ipfs,
59 tx.clone(),
60 rx,
61 ))
62 .detach();
63 Ok(DocStream {
64 doc,
65 ipfs: self.ipfs.clone(),
66 events,
67 publisher: tx,
68 })
69 }
70}
71
72async fn doc_task(
73 doc: DocId,
74 mut events: SwarmEvents,
75 mut stream: impl Stream<Item = GossipEvent> + Send + Unpin + 'static,
76 ipfs: Ipfs,
77 mut tx: mpsc::Sender<()>,
78 mut rx: mpsc::Receiver<()>,
79) {
80 let local_peer_id = ipfs.local_public_key().into();
81 loop {
82 futures::select! {
83 ev = rx.next().fuse() => match ev {
84 Some(()) => {
85 let head = match ipfs.stream_head(&StreamId::new(local_peer_id, doc)) {
86 Ok(Some(head)) => head,
87 Ok(None) => continue,
88 Err(err) => {
89 tracing::error!("fetching head err {}", err);
90 continue;
91 }
92 };
93 tracing::info!(
94 "{}: publish_head (peer {}) (offset {})",
95 doc,
96 head.head().id().peer(),
97 head.head().len(),
98 );
99 if let Err(err) = ipfs.publish(&doc.to_string(), head.as_bytes().to_vec()) {
100 tracing::info!("publish error: {}", err);
101 }
102 }
103 None => return,
104 },
105 ev = stream.next().fuse() => match ev {
106 Some(GossipEvent::Message(_, head)) => {
107 match LayoutVerified::<_, SignedHead>::new(&head[..]) {
108 Some(head) => {
109 tracing::info!(
110 "{}: new_head (peer {}) (offset {})",
111 doc,
112 head.head().id().peer(),
113 head.head().len(),
114 );
115 ipfs.stream_update_head(*head);
116 }
117 None => tracing::debug!("gossip: failed to decode head"),
118 }
119 }
120 Some(GossipEvent::Subscribed(peer_id)) => {
121 tracing::info!("{}: new_peer (peer {})", doc, peer_id);
122 ipfs.stream_add_peers(doc, std::iter::once(peer_id));
123 let _ = tx.try_send(());
124 }
125 _ => {}
126 },
127 ev = events.next().fuse() => match ev {
128 Some(Event::Discovered(peer_id)) => ipfs.dial(&peer_id),
129 _ => {}
130 }
131 }
132 }
133}
134
135pub struct DocStream {
136 doc: DocId,
137 ipfs: Ipfs,
138 events: SwarmEvents,
139 publisher: mpsc::Sender<()>,
140}
141
142impl Stream for DocStream {
143 type Item = Head;
144
145 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
146 loop {
147 return match Pin::new(&mut self.events).poll_next(cx) {
148 Poll::Ready(Some(Event::NewHead(head))) => {
149 if head.id().doc() != self.doc {
150 continue;
151 }
152 tracing::info!("{}: sync_complete (offset {})", self.doc, head.len());
153 Poll::Ready(Some(head))
154 }
155 Poll::Ready(None) => Poll::Ready(None),
156 Poll::Pending => Poll::Pending,
157 _ => continue,
158 };
159 }
160 }
161}
162
163impl DocStream {
164 pub fn append(&self) -> Result<DocWriter> {
165 let writer = self.ipfs.stream_append(self.doc)?;
166 Ok(DocWriter {
167 inner: writer,
168 publisher: self.publisher.clone(),
169 })
170 }
171}
172
173pub struct DocWriter {
174 inner: LocalStreamWriter,
175 publisher: mpsc::Sender<()>,
176}
177
178impl DocWriter {
179 pub fn id(&self) -> &StreamId {
180 self.inner.head().id()
181 }
182
183 pub fn head(&self) -> &Head {
184 self.inner.head()
185 }
186
187 pub fn commit(&mut self) -> Result<()> {
188 self.inner.commit()?;
189 let _ = self.publisher.try_send(());
190 Ok(())
191 }
192}
193
194impl Write for DocWriter {
195 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
196 self.inner.write(buf)
197 }
198
199 fn flush(&mut self) -> io::Result<()> {
200 self.inner.flush()
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use ipfs_embed::{generate_keypair, Config};
208 use rand::RngCore;
209 use std::io::Read;
210 use std::path::PathBuf;
211 use tempdir::TempDir;
212
213 fn tracing_try_init() {
214 tracing_subscriber::fmt()
215 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
216 .try_init()
217 .ok();
218 }
219
220 fn rand_bytes(size: usize) -> Vec<u8> {
221 let mut rng = rand::thread_rng();
222 let mut data = Vec::with_capacity(size);
223 data.resize(data.capacity(), 0);
224 rng.fill_bytes(&mut data);
225 data
226 }
227
228 async fn create_swarm(path: PathBuf) -> Result<BlakeStreams> {
229 std::fs::create_dir_all(&path)?;
230 let mut config = Config::new(&path, generate_keypair());
231 config.network.broadcast = None;
232 let ipfs = Ipfs::new(config).await?;
233 ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?.next().await;
234 Ok(BlakeStreams::new(ipfs))
235 }
236
237 #[async_std::test]
238 async fn test_streams() -> anyhow::Result<()> {
239 tracing_try_init();
240 let tmp = TempDir::new("test_streams")?;
241
242 let first = rand_bytes(8192);
243 let second = rand_bytes(8192);
244
245 let server = create_swarm(tmp.path().join("server")).await?;
246 let client = create_swarm(tmp.path().join("client")).await?;
247
248 let doc_id = DocId::unique();
249 let doc = server.subscribe(doc_id).await?;
250 let mut append = doc.append()?;
251 append.write_all(&first)?;
252 append.commit()?;
253
254 let mut stream = client.subscribe(doc_id).await?;
255 client.link_stream(append.id())?;
256
257 let head = stream.next().await.unwrap();
258 let mut buf = vec![];
259 client
260 .slice(head.id(), 0, head.len())?
261 .read_to_end(&mut buf)?;
262 assert_eq!(buf, first);
263
264 append.write_all(&second)?;
265 append.commit()?;
266
267 let head2 = stream.next().await.unwrap();
268 let mut buf = vec![];
269 client
270 .slice(head2.id(), head.len(), head2.len() - head.len())?
271 .read_to_end(&mut buf)?;
272 assert_eq!(buf, second);
273
274 Ok(())
275 }
276}