blake_streams/
lib.rs

1use anyhow::Result;
2use futures::channel::mpsc;
3use futures::prelude::*;
4use ipfs_embed::{Event, GossipEvent, LocalStreamWriter, SignedHead};
5use std::io::{self, Write};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use zerocopy::{AsBytes, LayoutVerified};
9
10pub type Ipfs = ipfs_embed::Ipfs<ipfs_embed::DefaultParams>;
11pub use blake_streams_core::PeerId;
12pub use ipfs_embed::{self, DocId, Head, StreamId, StreamReader, SwarmEvents};
13
14pub struct BlakeStreams {
15    ipfs: Ipfs,
16}
17
18impl BlakeStreams {
19    pub fn new(ipfs: Ipfs) -> Self {
20        Self { ipfs }
21    }
22
23    pub fn ipfs(&self) -> &Ipfs {
24        &self.ipfs
25    }
26
27    pub fn streams(&self) -> Result<Vec<StreamId>> {
28        self.ipfs.streams()
29    }
30
31    pub fn head(&self, id: &StreamId) -> Result<Option<SignedHead>> {
32        self.ipfs.stream_head(id)
33    }
34
35    pub fn slice(&self, id: &StreamId, start: u64, len: u64) -> Result<StreamReader> {
36        self.ipfs.stream_slice(id, start, len)
37    }
38
39    pub fn remove(&self, id: &StreamId) -> Result<()> {
40        self.ipfs.stream_remove(id)
41    }
42
43    pub fn link_stream(&self, id: &StreamId) -> Result<()> {
44        tracing::info!("{}: linking stream (peer {})", id.doc(), id.peer());
45        self.ipfs.stream_subscribe(id)
46    }
47
48    pub async fn subscribe(&self, doc: DocId) -> Result<DocStream> {
49        tracing::info!("{}: subscribe", doc);
50        let events = self.ipfs.swarm_events();
51        let (mut tx, rx) = mpsc::channel(1);
52        let _ = tx.try_send(());
53        let ipfs = self.ipfs.clone();
54        async_global_executor::spawn(doc_task(
55            doc,
56            ipfs.swarm_events(),
57            ipfs.subscribe(&doc.to_string())?,
58            ipfs,
59            tx.clone(),
60            rx,
61        ))
62        .detach();
63        Ok(DocStream {
64            doc,
65            ipfs: self.ipfs.clone(),
66            events,
67            publisher: tx,
68        })
69    }
70}
71
72async fn doc_task(
73    doc: DocId,
74    mut events: SwarmEvents,
75    mut stream: impl Stream<Item = GossipEvent> + Send + Unpin + 'static,
76    ipfs: Ipfs,
77    mut tx: mpsc::Sender<()>,
78    mut rx: mpsc::Receiver<()>,
79) {
80    let local_peer_id = ipfs.local_public_key().into();
81    loop {
82        futures::select! {
83            ev = rx.next().fuse() => match ev {
84                Some(()) => {
85                    let head = match ipfs.stream_head(&StreamId::new(local_peer_id, doc)) {
86                        Ok(Some(head)) => head,
87                        Ok(None) => continue,
88                        Err(err) => {
89                            tracing::error!("fetching head err {}", err);
90                            continue;
91                        }
92                    };
93                    tracing::info!(
94                        "{}: publish_head (peer {}) (offset {})",
95                        doc,
96                        head.head().id().peer(),
97                        head.head().len(),
98                    );
99                    if let Err(err) = ipfs.publish(&doc.to_string(), head.as_bytes().to_vec()) {
100                        tracing::info!("publish error: {}", err);
101                    }
102                }
103                None => return,
104            },
105            ev = stream.next().fuse() => match ev {
106                Some(GossipEvent::Message(_, head)) => {
107                    match LayoutVerified::<_, SignedHead>::new(&head[..]) {
108                        Some(head) => {
109                            tracing::info!(
110                                "{}: new_head (peer {}) (offset {})",
111                                doc,
112                                head.head().id().peer(),
113                                head.head().len(),
114                            );
115                            ipfs.stream_update_head(*head);
116                        }
117                        None => tracing::debug!("gossip: failed to decode head"),
118                    }
119                }
120                Some(GossipEvent::Subscribed(peer_id)) => {
121                    tracing::info!("{}: new_peer (peer {})", doc, peer_id);
122                    ipfs.stream_add_peers(doc, std::iter::once(peer_id));
123                    let _ = tx.try_send(());
124                }
125                _ => {}
126            },
127            ev = events.next().fuse() => match ev {
128                Some(Event::Discovered(peer_id)) => ipfs.dial(&peer_id),
129                _ => {}
130            }
131        }
132    }
133}
134
135pub struct DocStream {
136    doc: DocId,
137    ipfs: Ipfs,
138    events: SwarmEvents,
139    publisher: mpsc::Sender<()>,
140}
141
142impl Stream for DocStream {
143    type Item = Head;
144
145    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
146        loop {
147            return match Pin::new(&mut self.events).poll_next(cx) {
148                Poll::Ready(Some(Event::NewHead(head))) => {
149                    if head.id().doc() != self.doc {
150                        continue;
151                    }
152                    tracing::info!("{}: sync_complete (offset {})", self.doc, head.len());
153                    Poll::Ready(Some(head))
154                }
155                Poll::Ready(None) => Poll::Ready(None),
156                Poll::Pending => Poll::Pending,
157                _ => continue,
158            };
159        }
160    }
161}
162
163impl DocStream {
164    pub fn append(&self) -> Result<DocWriter> {
165        let writer = self.ipfs.stream_append(self.doc)?;
166        Ok(DocWriter {
167            inner: writer,
168            publisher: self.publisher.clone(),
169        })
170    }
171}
172
173pub struct DocWriter {
174    inner: LocalStreamWriter,
175    publisher: mpsc::Sender<()>,
176}
177
178impl DocWriter {
179    pub fn id(&self) -> &StreamId {
180        self.inner.head().id()
181    }
182
183    pub fn head(&self) -> &Head {
184        self.inner.head()
185    }
186
187    pub fn commit(&mut self) -> Result<()> {
188        self.inner.commit()?;
189        let _ = self.publisher.try_send(());
190        Ok(())
191    }
192}
193
194impl Write for DocWriter {
195    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
196        self.inner.write(buf)
197    }
198
199    fn flush(&mut self) -> io::Result<()> {
200        self.inner.flush()
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use ipfs_embed::{generate_keypair, Config};
208    use rand::RngCore;
209    use std::io::Read;
210    use std::path::PathBuf;
211    use tempdir::TempDir;
212
213    fn tracing_try_init() {
214        tracing_subscriber::fmt()
215            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
216            .try_init()
217            .ok();
218    }
219
220    fn rand_bytes(size: usize) -> Vec<u8> {
221        let mut rng = rand::thread_rng();
222        let mut data = Vec::with_capacity(size);
223        data.resize(data.capacity(), 0);
224        rng.fill_bytes(&mut data);
225        data
226    }
227
228    async fn create_swarm(path: PathBuf) -> Result<BlakeStreams> {
229        std::fs::create_dir_all(&path)?;
230        let mut config = Config::new(&path, generate_keypair());
231        config.network.broadcast = None;
232        let ipfs = Ipfs::new(config).await?;
233        ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?.next().await;
234        Ok(BlakeStreams::new(ipfs))
235    }
236
237    #[async_std::test]
238    async fn test_streams() -> anyhow::Result<()> {
239        tracing_try_init();
240        let tmp = TempDir::new("test_streams")?;
241
242        let first = rand_bytes(8192);
243        let second = rand_bytes(8192);
244
245        let server = create_swarm(tmp.path().join("server")).await?;
246        let client = create_swarm(tmp.path().join("client")).await?;
247
248        let doc_id = DocId::unique();
249        let doc = server.subscribe(doc_id).await?;
250        let mut append = doc.append()?;
251        append.write_all(&first)?;
252        append.commit()?;
253
254        let mut stream = client.subscribe(doc_id).await?;
255        client.link_stream(append.id())?;
256
257        let head = stream.next().await.unwrap();
258        let mut buf = vec![];
259        client
260            .slice(head.id(), 0, head.len())?
261            .read_to_end(&mut buf)?;
262        assert_eq!(buf, first);
263
264        append.write_all(&second)?;
265        append.commit()?;
266
267        let head2 = stream.next().await.unwrap();
268        let mut buf = vec![];
269        client
270            .slice(head2.id(), head.len(), head2.len() - head.len())?
271            .read_to_end(&mut buf)?;
272        assert_eq!(buf, second);
273
274        Ok(())
275    }
276}