Skip to main content

co_primitives/library/
unixfs.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{AnyBlockStorage, Block, BlockStorage, KnownMultiCodec, MultiCodec, StorageError};
5use cid::Cid;
6use futures::{AsyncRead, AsyncReadExt};
7use rust_unixfs::file::{adder::FileAdder, visit::IdleFileVisit};
8
9/// Read unixfs file into buffer.
10///
11/// See: https://github.com/dariusc93/rust-ipfs/blob/libp2p-next/unixfs/examples/cat.rs
12pub async fn unixfs_cat_buffer(storage: &impl AnyBlockStorage, cid: &Cid) -> Result<Vec<u8>, StorageError> {
13	let mut result = Vec::new();
14
15	// The blockstore specific way of reading the block. Here we assume go-ipfs 0.5 default flatfs
16	// configuration, which puts the files at sharded directories and names the blocks as base32
17	// upper and a suffix of "data".
18	//
19	// For the ipfs-unixfs it is important that the raw block data lives long enough that the
20	// possible content gets to be processed, at minimum one step of the walk as shown in this
21	// example.
22	let mut buf = Vec::new();
23	buf.append(
24		&mut storage
25			.get(MultiCodec::with_codec(KnownMultiCodec::DagPb, cid)?)
26			.await?
27			.into_inner()
28			.1,
29	);
30
31	// First step of the walk can give content or continued visitation but not both.
32	let (content, _, _metadata, mut step) = IdleFileVisit::default()
33		.start(&buf)
34		.map_err(|e| StorageError::Internal(e.into()))?;
35	result.extend_from_slice(content);
36
37	// Following steps repeat the same pattern:
38	while let Some(visit) = step {
39		// Read the next link. The `pending_links()` gives the next link and an iterator over the
40		// following links. The iterator lists the known links in the order of traversal, with the
41		// exception of possible new links appearing before the older.
42		let (first, _) = visit.pending_links();
43
44		buf.clear();
45		buf.append(&mut storage.get(first).await?.into_inner().1);
46
47		// Similar to first step, except we no longer get the file metadata. It is still accessible
48		// from the `visit` via `AsRef<ipfs_unixfs::file::Metadata>` but likely only needed in
49		// the first step.
50		let (content, next_step) = visit
51			.continue_walk(&buf, &mut None)
52			.map_err(|e| StorageError::Internal(e.into()))?;
53		result.extend_from_slice(content);
54
55		// Using a while loop combined with `let Some(visit) = step` allows for easy walking.
56		step = next_step;
57	}
58
59	// result
60	Ok(result)
61}
62
63/// Add stream as unixfs file to storage.
64/// The last CID in the result is the root.
65///
66/// See: https://github.com/dariusc93/rust-ipfs/blob/libp2p-next/unixfs/examples/add.rs
67pub async fn unixfs_add<I>(storage: &impl AnyBlockStorage, stream: &mut I) -> Result<Vec<Cid>, StorageError>
68where
69	I: AsyncRead + Unpin,
70{
71	let mut result = Vec::new();
72	let mut adder = FileAdder::default();
73	let mut buf = vec![0u8; 16384];
74	loop {
75		// read
76		let bytes = stream.read(&mut buf).await.map_err(|e| StorageError::Internal(e.into()))?;
77		if bytes == 0 {
78			let blocks = adder.finish();
79			add_blocks(storage, blocks, &mut result).await?;
80			break;
81		}
82
83		// process
84		let mut total = 0;
85		while total < bytes {
86			let (blocks, consumed) = adder.push(&buf[total..bytes]);
87			add_blocks(storage, blocks, &mut result).await?;
88			total += consumed;
89		}
90	}
91	Ok(result)
92}
93
94/// Encode buffer into blocks.
95/// The last block in the result is the root.
96pub fn unixfs_encode_buffer(buf: &[u8]) -> Vec<Block> {
97	let mut result = Vec::new();
98	let mut adder = FileAdder::default();
99
100	// push
101	let mut total = 0;
102	while total < buf.len() {
103		let (blocks, consumed) = adder.push(&buf[total..]);
104		for (cid, data) in blocks {
105			result.push(Block::new_unchecked(cid, data));
106		}
107		total += consumed;
108	}
109
110	// finish
111	for (cid, data) in adder.finish() {
112		result.push(Block::new_unchecked(cid, data));
113	}
114
115	// result
116	result
117}
118
119/// Add blocks to storage and add its CID's to `cids`.
120async fn add_blocks<S>(
121	storage: &S,
122	blocks: impl Iterator<Item = (Cid, Vec<u8>)>,
123	cids: &mut Vec<Cid>,
124) -> Result<(), StorageError>
125where
126	S: BlockStorage + Send,
127{
128	for (cid, data) in blocks {
129		let block = Block::new_unchecked(cid, data);
130		let cid = storage.set(block).await?;
131		cids.push(cid);
132	}
133	Ok(())
134}
135
136#[cfg(test)]
137mod tests {
138	use crate::{unixfs_add, unixfs_cat_buffer, TestStorage};
139	use cid::Cid;
140	use futures::io::Cursor;
141	use std::str::FromStr;
142
143	/// Test 1MiB of data.
144	/// Note that the leaf blocks are the same because they contain the same data.
145	#[tokio::test]
146	async fn test_unixfs_add() {
147		let storage = TestStorage::default();
148		let mut stream = Cursor::new("hello world test".repeat(64).repeat(1024).as_bytes().to_vec()); // 1024KiB
149		let cids = unixfs_add(&storage, &mut stream).await.unwrap();
150		// println!("cids: {:?}", cids);
151		assert_eq!(5, cids.len());
152		assert_eq!(cids[0], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
153		assert_eq!(cids[1], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
154		assert_eq!(cids[2], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
155		assert_eq!(cids[3], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
156		assert_eq!(cids[4], Cid::from_str("QmVRRmYKvn8m3jQT8VHX1BCgrQLFvzsB26aKwLCyFRvYSv").unwrap());
157	}
158
159	#[tokio::test]
160	async fn test_unixfs_add_empty() {
161		let storage = TestStorage::default();
162		let mut stream = Cursor::new([]);
163		let cids = unixfs_add(&storage, &mut stream).await.unwrap();
164		assert_eq!(1, cids.len());
165		assert_eq!(cids[0], Cid::from_str("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());
166	}
167
168	#[tokio::test]
169	async fn test_unixfs_cat_buffer() {
170		let storage = TestStorage::default();
171		let data = "hello world test".repeat(64).repeat(1024); // 1024KiB
172		let mut stream = Cursor::new(data.as_bytes().to_vec());
173		let cids = unixfs_add(&storage, &mut stream).await.unwrap();
174		let buffer = unixfs_cat_buffer(&storage, cids.last().unwrap()).await.unwrap();
175		assert_eq!(data.as_bytes().to_vec(), buffer);
176	}
177}