noosphere_sphere/replication/
car.rs1use anyhow::Result;
2use async_stream::try_stream;
3use bytes::Bytes;
4use cid::Cid;
5use futures_util::sink::SinkExt;
6use iroh_car::{CarHeader, CarWriter};
7use std::io::{Error as IoError, ErrorKind as IoErrorKind};
8use tokio::sync::mpsc::channel;
9use tokio_stream::Stream;
10use tokio_util::{
11 io::{CopyToBytes, SinkWriter},
12 sync::PollSender,
13};
14
15pub fn car_stream<S>(
19 mut roots: Vec<Cid>,
20 block_stream: S,
21) -> impl Stream<Item = Result<Bytes, IoError>> + Send
22where
23 S: Stream<Item = Result<(Cid, Vec<u8>)>> + Send,
24{
25 if roots.is_empty() {
26 roots = vec![Cid::default()]
27 }
28
29 try_stream! {
30 let (tx, mut rx) = channel::<Bytes>(16);
31 let sink =
32 PollSender::new(tx).sink_map_err(|error| {
33 error!("Failed to send CAR frame: {}", error);
34 IoError::from(IoErrorKind::BrokenPipe)
35 });
36
37 let mut car_buffer = SinkWriter::new(CopyToBytes::new(sink));
38 let car_header = CarHeader::new_v1(roots);
39 let mut car_writer = CarWriter::new(car_header, &mut car_buffer);
40 let mut sent_blocks = false;
41
42 for await item in block_stream {
43 sent_blocks = true;
44 let (cid, block) = item.map_err(|error| {
45 error!("Failed to stream blocks: {}", error);
46 IoError::from(IoErrorKind::BrokenPipe)
47 })?;
48
49 car_writer.write(cid, block).await.map_err(|error| {
50 error!("Failed to write CAR frame: {}", error);
51 IoError::from(IoErrorKind::BrokenPipe)
52 })?;
53
54 car_writer.flush().await.map_err(|error| {
55 error!("Failed to flush CAR frames: {}", error);
56 IoError::from(IoErrorKind::BrokenPipe)
57 })?;
58
59 while let Ok(block) = rx.try_recv() {
60 yield block;
61 }
62 }
63
64 if !sent_blocks {
65 car_writer.write_header().await.map_err(|error| {
66 error!("Failed to write CAR frame: {}", error);
67 IoError::from(IoErrorKind::BrokenPipe)
68 })?;
69 car_writer.flush().await.map_err(|error| {
70 error!("Failed to flush CAR frames: {}", error);
71 IoError::from(IoErrorKind::BrokenPipe)
72 })?;
73
74 while let Ok(block) = rx.try_recv() {
75 yield block;
76 }
77 }
78 }
79}