guardian_db/ipfs_core_api/
compat.rs

1// Camada de compatibilidade
2//
3// Mantém compatibilidade com código existente que usava ipfs_api_backend_hyper
4
5use crate::error::Result;
6use crate::ipfs_core_api::client::IpfsClient;
7use cid::Cid;
8use std::pin::Pin;
9use tokio::io::AsyncReadExt;
10
11/// Stream compatível que possui método concat() similar ao ipfs_api_backend_hyper
12pub struct ConcatStream {
13    inner: Option<Pin<Box<dyn tokio::io::AsyncRead + Send>>>,
14    error: Option<crate::error::GuardianError>,
15}
16
17impl ConcatStream {
18    /// Cria um novo ConcatStream com um reader
19    pub fn new(reader: Pin<Box<dyn tokio::io::AsyncRead + Send>>) -> Self {
20        Self {
21            inner: Some(reader),
22            error: None,
23        }
24    }
25
26    /// Cria um ConcatStream com erro
27    pub fn error(error: crate::error::GuardianError) -> Self {
28        Self {
29            inner: None,
30            error: Some(error),
31        }
32    }
33
34    /// Método compatível que coleta todos os dados em um Vec<u8>
35    /// Similar ao concat() do ipfs_api_backend_hyper
36    pub async fn concat(mut self) -> Result<Vec<u8>> {
37        if let Some(error) = self.error {
38            return Err(error);
39        }
40
41        if let Some(mut reader) = self.inner.take() {
42            let mut buffer = Vec::new();
43            reader.read_to_end(&mut buffer).await.map_err(|e| {
44                crate::error::GuardianError::Other(format!("Falha ao ler dados do stream: {}", e))
45            })?;
46            Ok(buffer)
47        } else {
48            Ok(Vec::new())
49        }
50    }
51
52    /// Método para obter o tamanho estimado (compatibilidade)
53    pub fn size_hint(&self) -> (usize, Option<usize>) {
54        (0, None)
55    }
56}
57
58/// Adaptador para manter compatibilidade total com ipfs_api_backend_hyper::IpfsClient
59pub struct IpfsClientAdapter {
60    client: IpfsClient,
61}
62
63impl IpfsClientAdapter {
64    /// Cria um novo adaptador com um cliente IPFS
65    pub fn new(client: IpfsClient) -> Self {
66        Self { client }
67    }
68
69    /// Cria adaptador com configuração padrão
70    pub async fn default() -> Result<Self> {
71        let client = IpfsClient::default().await?;
72        Ok(Self::new(client))
73    }
74
75    /// Cria adaptador para desenvolvimento
76    pub async fn development() -> Result<Self> {
77        let client = IpfsClient::development().await?;
78        Ok(Self::new(client))
79    }
80
81    /// Cria adaptador para produção
82    pub async fn production() -> Result<Self> {
83        let client = IpfsClient::production().await?;
84        Ok(Self::new(client))
85    }
86
87    /// Método compatível com ipfs_api_backend_hyper::IpfsClient::add
88    pub async fn add<R>(&self, data: R) -> Result<AddResponse>
89    where
90        R: tokio::io::AsyncRead + Send + Unpin + 'static,
91    {
92        self.client.add(data).await
93    }
94
95    /// Método compatível com ipfs_api_backend_hyper::IpfsClient::cat
96    /// Retorna ConcatStream para manter compatibilidade total
97    pub async fn cat(&self, hash: &str) -> ConcatStream {
98        match self.client.cat(hash).await {
99            Ok(stream) => ConcatStream::new(stream),
100            Err(e) => ConcatStream::error(e),
101        }
102    }
103
104    /// Método compatível com dag_get
105    pub async fn dag_get(&self, cid: &Cid, path: Option<&str>) -> Result<Vec<u8>> {
106        self.client.dag_get(cid, path).await
107    }
108
109    /// Método compatível com dag_put
110    pub async fn dag_put(&self, data: &[u8]) -> Result<Cid> {
111        self.client.dag_put(data).await
112    }
113
114    /// Método compatível para pubsub publish
115    pub async fn pubsub_publish(&self, topic: &str, data: &[u8]) -> Result<()> {
116        self.client.pubsub_publish(topic, data).await
117    }
118
119    /// Método compatível para pubsub subscribe
120    pub async fn pubsub_subscribe(&self, topic: &str) -> Result<PubsubStream> {
121        self.client.pubsub_subscribe(topic).await
122    }
123
124    /// Método compatível para pubsub peers
125    pub async fn pubsub_peers(&self, topic: &str) -> Result<Vec<libp2p::PeerId>> {
126        self.client.pubsub_peers(topic).await
127    }
128
129    /// Método compatível para pubsub topics
130    pub async fn pubsub_topics(&self) -> Result<Vec<String>> {
131        self.client.pubsub_topics().await
132    }
133
134    /// Método compatível para swarm connect
135    pub async fn swarm_connect(&self, peer: &libp2p::PeerId) -> Result<()> {
136        self.client.swarm_connect(peer).await
137    }
138
139    /// Método compatível para swarm peers
140    pub async fn swarm_peers(&self) -> Result<Vec<PeerInfo>> {
141        self.client.swarm_peers().await
142    }
143
144    /// Método compatível para node id
145    pub async fn id(&self) -> Result<NodeInfo> {
146        self.client.id().await
147    }
148
149    /// Método compatível para pin add
150    pub async fn pin_add(&self, hash: &str, recursive: bool) -> Result<PinResponse> {
151        self.client.pin_add(hash, recursive).await
152    }
153
154    /// Método compatível para pin rm
155    pub async fn pin_rm(&self, hash: &str) -> Result<PinResponse> {
156        self.client.pin_rm(hash).await
157    }
158
159    /// Método compatível para pin ls
160    pub async fn pin_ls(&self, pin_type: Option<PinType>) -> Result<Vec<PinResponse>> {
161        self.client.pin_ls(pin_type).await
162    }
163
164    /// Método compatível para repo stat
165    pub async fn repo_stat(&self) -> Result<RepoStats> {
166        self.client.repo_stat().await
167    }
168
169    /// Verifica se está online
170    pub async fn is_online(&self) -> bool {
171        self.client.is_online().await
172    }
173
174    /// Shutdown do cliente
175    pub async fn shutdown(&self) -> Result<()> {
176        self.client.shutdown().await
177    }
178
179    /// Acesso ao cliente interno
180    pub fn inner(&self) -> &IpfsClient {
181        &self.client
182    }
183
184    /// Gera channel ID (compatibilidade com one_on_one_channel.rs)
185    pub fn get_channel_id(&self, other_peer: &libp2p::PeerId) -> String {
186        self.client.get_channel_id(other_peer)
187    }
188}
189
190/// Função de conveniência para criar um adaptador com configuração de string
191/// Compatível com IpfsClient::from_str() do ipfs_api_backend_hyper
192pub async fn from_str(addr: &str) -> Result<IpfsClientAdapter> {
193    // Ignora o endereço HTTP e cria cliente nativo
194    // Em produção, poderia parsear o endereço para extrair configurações
195    tracing::warn!(
196        "Ignorando endereço HTTP '{}' - usando cliente IPFS nativo",
197        addr
198    );
199
200    IpfsClientAdapter::default().await
201}
202
203/// Cria adaptador com configuração baseada em URL
204/// Fornece migração mais suave para código existente
205pub async fn from_url(url: &str) -> Result<IpfsClientAdapter> {
206    from_str(url).await
207}
208
209/// Trait para facilitar migração de código existente
210#[allow(async_fn_in_trait)]
211pub trait IpfsClientCompat {
212    async fn add_compat<R>(&self, data: R) -> Result<AddResponse>
213    where
214        R: tokio::io::AsyncRead + Send + Unpin + 'static;
215
216    async fn cat_compat(&self, hash: &str) -> Result<Vec<u8>>;
217}
218
219impl IpfsClientCompat for IpfsClient {
220    async fn add_compat<R>(&self, data: R) -> Result<AddResponse>
221    where
222        R: tokio::io::AsyncRead + Send + Unpin + 'static,
223    {
224        self.add(data).await
225    }
226
227    async fn cat_compat(&self, hash: &str) -> Result<Vec<u8>> {
228        let mut stream = self.cat(hash).await?;
229        let mut buffer = Vec::new();
230        stream.read_to_end(&mut buffer).await.map_err(|e| {
231            crate::error::GuardianError::Other(format!("Failed to read stream: {}", e))
232        })?;
233        Ok(buffer)
234    }
235}
236
237/// Macro para facilitar migração de código existente
238#[macro_export]
239macro_rules! migrate_ipfs_client {
240    ($old_client:expr) => {{
241        // Substitui ipfs_api_backend_hyper::IpfsClient pelo nosso cliente nativo
242        $crate::ipfs_core_api::compat::IpfsClientAdapter::default().await?
243    }};
244}
245
246/// Re-exports para compatibilidade completa
247pub use crate::ipfs_core_api::types::{
248    AddResponse, NodeInfo, PeerInfo, PinResponse, PinType, PubsubMessage, PubsubStream, RepoStats,
249};
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use std::io::Cursor;
255
256    #[tokio::test]
257    async fn test_adapter_creation() {
258        let unique_id = std::time::SystemTime::now()
259            .duration_since(std::time::UNIX_EPOCH)
260            .unwrap()
261            .as_nanos();
262
263        let mut config = crate::ipfs_core_api::ClientConfig::development();
264        config.data_store_path = Some(std::path::PathBuf::from(format!(
265            "./tmp/ipfs_test_{}",
266            unique_id
267        )));
268
269        let client = crate::ipfs_core_api::IpfsClient::new(config).await;
270        if let Err(ref e) = client {
271            eprintln!("Client creation failed: {}", e);
272        }
273        assert!(client.is_ok(), "Failed to create IPFS client");
274
275        let adapter = IpfsClientAdapter::new(client.unwrap());
276        // Test that adapter is created successfully
277        assert!(adapter.client.config().enable_pubsub);
278    }
279
280    #[tokio::test]
281    async fn test_concat_stream() {
282        let test_data = b"Hello, concat stream!";
283        let cursor = tokio::io::BufReader::new(std::io::Cursor::new(test_data.to_vec()));
284
285        let stream = ConcatStream::new(Box::pin(cursor));
286        let result = stream.concat().await.unwrap();
287
288        assert_eq!(result, test_data);
289    }
290
291    #[tokio::test]
292    async fn test_concat_stream_error() {
293        let error = crate::error::GuardianError::Other("Test error".to_string());
294        let stream = ConcatStream::error(error);
295
296        let result = stream.concat().await;
297        assert!(result.is_err());
298    }
299
300    #[tokio::test]
301    #[ignore] // Requires running IPFS daemon
302    async fn test_adapter_add_and_cat() {
303        let adapter = IpfsClientAdapter::development().await.unwrap();
304
305        let test_data = "Hello, adapter!".as_bytes();
306        let cursor = Cursor::new(test_data.to_vec());
307
308        // Test add
309        let response = adapter.add(cursor).await.unwrap();
310        assert!(!response.hash.is_empty());
311
312        // Test cat with ConcatStream
313        let stream = adapter.cat(&response.hash).await;
314        let retrieved_data = stream.concat().await.unwrap();
315
316        assert_eq!(test_data, retrieved_data.as_slice());
317    }
318
319    #[tokio::test]
320    #[ignore] // Requires running IPFS daemon
321    async fn test_adapter_pubsub() {
322        let adapter = IpfsClientAdapter::development().await.unwrap();
323
324        // Test publish
325        let result = adapter.pubsub_publish("test-topic", b"test message").await;
326        assert!(result.is_ok());
327
328        // Test topics
329        let topics = adapter.pubsub_topics().await.unwrap();
330        assert!(topics.is_empty());
331
332        // Test peers
333        let peers = adapter.pubsub_peers("test-topic").await.unwrap();
334        assert!(peers.is_empty());
335    }
336
337    #[tokio::test]
338    async fn test_from_str() {
339        let adapter = from_str("http://localhost:5001").await;
340        if adapter.is_err() {
341            println!(
342                "Adapter creation failed (expected without IPFS daemon): {:?}",
343                adapter.err()
344            );
345            return; // Skip teste se não houver daemon IPFS rodando
346        }
347
348        let adapter = adapter.unwrap();
349        assert!(adapter.is_online().await);
350    }
351
352    #[tokio::test]
353    #[ignore] // Requires running IPFS daemon
354    async fn test_compat_trait() {
355        let client = IpfsClient::development().await.unwrap();
356
357        let test_data = "compat trait test".as_bytes();
358        let cursor = Cursor::new(test_data.to_vec());
359
360        let response = client.add_compat(cursor).await.unwrap();
361        let retrieved = client.cat_compat(&response.hash).await.unwrap();
362
363        assert_eq!(test_data, retrieved.as_slice());
364    }
365
366    #[tokio::test]
367    #[ignore] // Requires running IPFS daemon
368    async fn test_dag_operations() {
369        let adapter = IpfsClientAdapter::development().await.unwrap();
370
371        let test_data = b"test dag adapter";
372        let cid = adapter.dag_put(test_data).await.unwrap();
373
374        let retrieved = adapter.dag_get(&cid, None).await.unwrap();
375        assert_eq!(test_data, retrieved.as_slice());
376    }
377
378    #[tokio::test]
379    #[ignore] // Requires running IPFS daemon
380    async fn test_pin_operations() {
381        let adapter = IpfsClientAdapter::development().await.unwrap();
382
383        // Add some data first
384        let test_data = "pin adapter test".as_bytes();
385        let cursor = Cursor::new(test_data.to_vec());
386        let response = adapter.add(cursor).await.unwrap();
387
388        // Test pin add
389        let pin_response = adapter.pin_add(&response.hash, true).await.unwrap();
390        assert_eq!(pin_response.pin_type, PinType::Recursive);
391
392        // Test pin ls
393        let pins = adapter.pin_ls(None).await.unwrap();
394        assert_eq!(pins.len(), 1);
395
396        // Test pin rm
397        let rm_response = adapter.pin_rm(&response.hash).await.unwrap();
398        assert_eq!(rm_response.hash, response.hash);
399    }
400
401    #[tokio::test]
402    #[ignore] // Requires running IPFS daemon
403    async fn test_node_info() {
404        let adapter = IpfsClientAdapter::development().await.unwrap();
405
406        let info = adapter.id().await.unwrap();
407        assert!(!info.agent_version.is_empty());
408        assert!(info.agent_version.contains("guardian-db"));
409    }
410
411    #[tokio::test]
412    #[ignore] // Requires running IPFS daemon
413    async fn test_channel_id_compatibility() {
414        let adapter = IpfsClientAdapter::development().await.unwrap();
415        let other_peer = libp2p::PeerId::random();
416
417        let channel_id = adapter.get_channel_id(&other_peer);
418        assert!(channel_id.starts_with("/ipfs-pubsub-direct-channel/v1/"));
419
420        // Test that it matches the inner client
421        let inner_channel_id = adapter.inner().get_channel_id(&other_peer);
422        assert_eq!(channel_id, inner_channel_id);
423    }
424}