guardian_db/ipfs_core_api/
compat.rs1use crate::error::Result;
6use crate::ipfs_core_api::client::IpfsClient;
7use cid::Cid;
8use std::pin::Pin;
9use tokio::io::AsyncReadExt;
10
11pub struct ConcatStream {
13 inner: Option<Pin<Box<dyn tokio::io::AsyncRead + Send>>>,
14 error: Option<crate::error::GuardianError>,
15}
16
17impl ConcatStream {
18 pub fn new(reader: Pin<Box<dyn tokio::io::AsyncRead + Send>>) -> Self {
20 Self {
21 inner: Some(reader),
22 error: None,
23 }
24 }
25
26 pub fn error(error: crate::error::GuardianError) -> Self {
28 Self {
29 inner: None,
30 error: Some(error),
31 }
32 }
33
34 pub async fn concat(mut self) -> Result<Vec<u8>> {
37 if let Some(error) = self.error {
38 return Err(error);
39 }
40
41 if let Some(mut reader) = self.inner.take() {
42 let mut buffer = Vec::new();
43 reader.read_to_end(&mut buffer).await.map_err(|e| {
44 crate::error::GuardianError::Other(format!("Falha ao ler dados do stream: {}", e))
45 })?;
46 Ok(buffer)
47 } else {
48 Ok(Vec::new())
49 }
50 }
51
52 pub fn size_hint(&self) -> (usize, Option<usize>) {
54 (0, None)
55 }
56}
57
58pub struct IpfsClientAdapter {
60 client: IpfsClient,
61}
62
63impl IpfsClientAdapter {
64 pub fn new(client: IpfsClient) -> Self {
66 Self { client }
67 }
68
69 pub async fn default() -> Result<Self> {
71 let client = IpfsClient::default().await?;
72 Ok(Self::new(client))
73 }
74
75 pub async fn development() -> Result<Self> {
77 let client = IpfsClient::development().await?;
78 Ok(Self::new(client))
79 }
80
81 pub async fn production() -> Result<Self> {
83 let client = IpfsClient::production().await?;
84 Ok(Self::new(client))
85 }
86
87 pub async fn add<R>(&self, data: R) -> Result<AddResponse>
89 where
90 R: tokio::io::AsyncRead + Send + Unpin + 'static,
91 {
92 self.client.add(data).await
93 }
94
95 pub async fn cat(&self, hash: &str) -> ConcatStream {
98 match self.client.cat(hash).await {
99 Ok(stream) => ConcatStream::new(stream),
100 Err(e) => ConcatStream::error(e),
101 }
102 }
103
104 pub async fn dag_get(&self, cid: &Cid, path: Option<&str>) -> Result<Vec<u8>> {
106 self.client.dag_get(cid, path).await
107 }
108
109 pub async fn dag_put(&self, data: &[u8]) -> Result<Cid> {
111 self.client.dag_put(data).await
112 }
113
114 pub async fn pubsub_publish(&self, topic: &str, data: &[u8]) -> Result<()> {
116 self.client.pubsub_publish(topic, data).await
117 }
118
119 pub async fn pubsub_subscribe(&self, topic: &str) -> Result<PubsubStream> {
121 self.client.pubsub_subscribe(topic).await
122 }
123
124 pub async fn pubsub_peers(&self, topic: &str) -> Result<Vec<libp2p::PeerId>> {
126 self.client.pubsub_peers(topic).await
127 }
128
129 pub async fn pubsub_topics(&self) -> Result<Vec<String>> {
131 self.client.pubsub_topics().await
132 }
133
134 pub async fn swarm_connect(&self, peer: &libp2p::PeerId) -> Result<()> {
136 self.client.swarm_connect(peer).await
137 }
138
139 pub async fn swarm_peers(&self) -> Result<Vec<PeerInfo>> {
141 self.client.swarm_peers().await
142 }
143
144 pub async fn id(&self) -> Result<NodeInfo> {
146 self.client.id().await
147 }
148
149 pub async fn pin_add(&self, hash: &str, recursive: bool) -> Result<PinResponse> {
151 self.client.pin_add(hash, recursive).await
152 }
153
154 pub async fn pin_rm(&self, hash: &str) -> Result<PinResponse> {
156 self.client.pin_rm(hash).await
157 }
158
159 pub async fn pin_ls(&self, pin_type: Option<PinType>) -> Result<Vec<PinResponse>> {
161 self.client.pin_ls(pin_type).await
162 }
163
164 pub async fn repo_stat(&self) -> Result<RepoStats> {
166 self.client.repo_stat().await
167 }
168
169 pub async fn is_online(&self) -> bool {
171 self.client.is_online().await
172 }
173
174 pub async fn shutdown(&self) -> Result<()> {
176 self.client.shutdown().await
177 }
178
179 pub fn inner(&self) -> &IpfsClient {
181 &self.client
182 }
183
184 pub fn get_channel_id(&self, other_peer: &libp2p::PeerId) -> String {
186 self.client.get_channel_id(other_peer)
187 }
188}
189
190pub async fn from_str(addr: &str) -> Result<IpfsClientAdapter> {
193 tracing::warn!(
196 "Ignorando endereço HTTP '{}' - usando cliente IPFS nativo",
197 addr
198 );
199
200 IpfsClientAdapter::default().await
201}
202
203pub async fn from_url(url: &str) -> Result<IpfsClientAdapter> {
206 from_str(url).await
207}
208
209#[allow(async_fn_in_trait)]
211pub trait IpfsClientCompat {
212 async fn add_compat<R>(&self, data: R) -> Result<AddResponse>
213 where
214 R: tokio::io::AsyncRead + Send + Unpin + 'static;
215
216 async fn cat_compat(&self, hash: &str) -> Result<Vec<u8>>;
217}
218
219impl IpfsClientCompat for IpfsClient {
220 async fn add_compat<R>(&self, data: R) -> Result<AddResponse>
221 where
222 R: tokio::io::AsyncRead + Send + Unpin + 'static,
223 {
224 self.add(data).await
225 }
226
227 async fn cat_compat(&self, hash: &str) -> Result<Vec<u8>> {
228 let mut stream = self.cat(hash).await?;
229 let mut buffer = Vec::new();
230 stream.read_to_end(&mut buffer).await.map_err(|e| {
231 crate::error::GuardianError::Other(format!("Failed to read stream: {}", e))
232 })?;
233 Ok(buffer)
234 }
235}
236
237#[macro_export]
239macro_rules! migrate_ipfs_client {
240 ($old_client:expr) => {{
241 $crate::ipfs_core_api::compat::IpfsClientAdapter::default().await?
243 }};
244}
245
246pub use crate::ipfs_core_api::types::{
248 AddResponse, NodeInfo, PeerInfo, PinResponse, PinType, PubsubMessage, PubsubStream, RepoStats,
249};
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use std::io::Cursor;
255
256 #[tokio::test]
257 async fn test_adapter_creation() {
258 let unique_id = std::time::SystemTime::now()
259 .duration_since(std::time::UNIX_EPOCH)
260 .unwrap()
261 .as_nanos();
262
263 let mut config = crate::ipfs_core_api::ClientConfig::development();
264 config.data_store_path = Some(std::path::PathBuf::from(format!(
265 "./tmp/ipfs_test_{}",
266 unique_id
267 )));
268
269 let client = crate::ipfs_core_api::IpfsClient::new(config).await;
270 if let Err(ref e) = client {
271 eprintln!("Client creation failed: {}", e);
272 }
273 assert!(client.is_ok(), "Failed to create IPFS client");
274
275 let adapter = IpfsClientAdapter::new(client.unwrap());
276 assert!(adapter.client.config().enable_pubsub);
278 }
279
280 #[tokio::test]
281 async fn test_concat_stream() {
282 let test_data = b"Hello, concat stream!";
283 let cursor = tokio::io::BufReader::new(std::io::Cursor::new(test_data.to_vec()));
284
285 let stream = ConcatStream::new(Box::pin(cursor));
286 let result = stream.concat().await.unwrap();
287
288 assert_eq!(result, test_data);
289 }
290
291 #[tokio::test]
292 async fn test_concat_stream_error() {
293 let error = crate::error::GuardianError::Other("Test error".to_string());
294 let stream = ConcatStream::error(error);
295
296 let result = stream.concat().await;
297 assert!(result.is_err());
298 }
299
300 #[tokio::test]
301 #[ignore] async fn test_adapter_add_and_cat() {
303 let adapter = IpfsClientAdapter::development().await.unwrap();
304
305 let test_data = "Hello, adapter!".as_bytes();
306 let cursor = Cursor::new(test_data.to_vec());
307
308 let response = adapter.add(cursor).await.unwrap();
310 assert!(!response.hash.is_empty());
311
312 let stream = adapter.cat(&response.hash).await;
314 let retrieved_data = stream.concat().await.unwrap();
315
316 assert_eq!(test_data, retrieved_data.as_slice());
317 }
318
319 #[tokio::test]
320 #[ignore] async fn test_adapter_pubsub() {
322 let adapter = IpfsClientAdapter::development().await.unwrap();
323
324 let result = adapter.pubsub_publish("test-topic", b"test message").await;
326 assert!(result.is_ok());
327
328 let topics = adapter.pubsub_topics().await.unwrap();
330 assert!(topics.is_empty());
331
332 let peers = adapter.pubsub_peers("test-topic").await.unwrap();
334 assert!(peers.is_empty());
335 }
336
337 #[tokio::test]
338 async fn test_from_str() {
339 let adapter = from_str("http://localhost:5001").await;
340 if adapter.is_err() {
341 println!(
342 "Adapter creation failed (expected without IPFS daemon): {:?}",
343 adapter.err()
344 );
345 return; }
347
348 let adapter = adapter.unwrap();
349 assert!(adapter.is_online().await);
350 }
351
352 #[tokio::test]
353 #[ignore] async fn test_compat_trait() {
355 let client = IpfsClient::development().await.unwrap();
356
357 let test_data = "compat trait test".as_bytes();
358 let cursor = Cursor::new(test_data.to_vec());
359
360 let response = client.add_compat(cursor).await.unwrap();
361 let retrieved = client.cat_compat(&response.hash).await.unwrap();
362
363 assert_eq!(test_data, retrieved.as_slice());
364 }
365
366 #[tokio::test]
367 #[ignore] async fn test_dag_operations() {
369 let adapter = IpfsClientAdapter::development().await.unwrap();
370
371 let test_data = b"test dag adapter";
372 let cid = adapter.dag_put(test_data).await.unwrap();
373
374 let retrieved = adapter.dag_get(&cid, None).await.unwrap();
375 assert_eq!(test_data, retrieved.as_slice());
376 }
377
378 #[tokio::test]
379 #[ignore] async fn test_pin_operations() {
381 let adapter = IpfsClientAdapter::development().await.unwrap();
382
383 let test_data = "pin adapter test".as_bytes();
385 let cursor = Cursor::new(test_data.to_vec());
386 let response = adapter.add(cursor).await.unwrap();
387
388 let pin_response = adapter.pin_add(&response.hash, true).await.unwrap();
390 assert_eq!(pin_response.pin_type, PinType::Recursive);
391
392 let pins = adapter.pin_ls(None).await.unwrap();
394 assert_eq!(pins.len(), 1);
395
396 let rm_response = adapter.pin_rm(&response.hash).await.unwrap();
398 assert_eq!(rm_response.hash, response.hash);
399 }
400
401 #[tokio::test]
402 #[ignore] async fn test_node_info() {
404 let adapter = IpfsClientAdapter::development().await.unwrap();
405
406 let info = adapter.id().await.unwrap();
407 assert!(!info.agent_version.is_empty());
408 assert!(info.agent_version.contains("guardian-db"));
409 }
410
411 #[tokio::test]
412 #[ignore] async fn test_channel_id_compatibility() {
414 let adapter = IpfsClientAdapter::development().await.unwrap();
415 let other_peer = libp2p::PeerId::random();
416
417 let channel_id = adapter.get_channel_id(&other_peer);
418 assert!(channel_id.starts_with("/ipfs-pubsub-direct-channel/v1/"));
419
420 let inner_channel_id = adapter.inner().get_channel_id(&other_peer);
422 assert_eq!(channel_id, inner_channel_id);
423 }
424}