guardian_db/ipfs_core_api/
client.rs1use crate::error::{GuardianError, Result};
4use crate::ipfs_core_api::{config::ClientConfig, errors::IpfsError, types::*};
5use async_stream::stream;
6use cid::Cid;
7use libp2p::PeerId;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12use tracing::{debug, info, warn};
13
14#[derive(Debug)]
16struct ClientState {
17 subscriptions: HashMap<String, broadcast::Sender<PubsubMessage>>,
19 connected_peers: HashMap<PeerId, PeerInfo>,
21 node_info: NodeInfo,
23 pinned_objects: HashMap<String, PinType>,
25 repo_stats: RepoStats,
27}
28
29impl ClientState {
30 fn new(node_id: PeerId, config: &ClientConfig) -> Self {
31 let node_info = if config.data_store_path.is_some() {
32 NodeInfo {
33 id: node_id,
34 public_key: format!("ed25519_{}", hex::encode(&node_id.to_bytes()[..16])),
35 addresses: config.listening_addrs.clone(),
36 agent_version: format!("{}/0.1.0", crate::ipfs_core_api::USER_AGENT),
37 protocol_version: "ipfs/0.1.0".to_string(),
38 }
39 } else {
40 NodeInfo::mock(node_id)
41 };
42
43 Self {
44 subscriptions: HashMap::new(),
45 connected_peers: HashMap::new(),
46 node_info,
47 pinned_objects: HashMap::new(),
48 repo_stats: RepoStats::default(),
49 }
50 }
51}
52
53#[derive(Clone)]
55pub struct IpfsClient {
56 backend: Arc<crate::ipfs_core_api::backends::IrohBackend>,
58 config: ClientConfig,
60 node_id: PeerId,
62 state: Arc<RwLock<ClientState>>,
64 initialized: bool,
66}
67
68impl IpfsClient {
69 pub async fn new(config: ClientConfig) -> Result<Self> {
71 config
73 .validate()
74 .map_err(|e| GuardianError::Other(format!("Invalid configuration: {}", e)))?;
75
76 info!("Inicializando cliente IPFS Core API");
77 info!(
78 "Configuração: PubSub={}, Swarm={}, mDNS={}, Kad={}",
79 config.enable_pubsub, config.enable_swarm, config.enable_mdns, config.enable_kad
80 );
81
82 let node_id = PeerId::random();
83 info!("Node ID gerado: {}", node_id);
84
85 let state = ClientState::new(node_id, &config);
86
87 let backend = Arc::new(crate::ipfs_core_api::backends::IrohBackend::new(&config).await?);
89
90 let client = Self {
91 backend,
92 config,
93 node_id,
94 state: Arc::new(RwLock::new(state)),
95 initialized: true,
96 };
97
98 debug!("Cliente IPFS configurado:");
100 debug!(" - Data path: {:?}", client.config.data_store_path);
101 debug!(" - Listening addrs: {:?}", client.config.listening_addrs);
102 debug!(
103 " - Bootstrap peers: {} configurados",
104 client.config.bootstrap_peers.len()
105 );
106
107 info!("✓ Cliente IPFS Core API inicializado com sucesso");
108 Ok(client)
109 }
110
111 pub async fn default() -> Result<Self> {
113 Self::new(ClientConfig::default()).await
114 }
115
116 pub async fn development() -> Result<Self> {
118 Self::new(ClientConfig::development()).await
119 }
120
121 pub async fn production() -> Result<Self> {
123 Self::new(ClientConfig::production()).await
124 }
125
126 pub async fn testing() -> Result<Self> {
128 Self::new(ClientConfig::testing()).await
129 }
130
131 pub async fn new_with_backend(
133 backend: Arc<crate::ipfs_core_api::backends::IrohBackend>,
134 ) -> Result<Self> {
135 let config = ClientConfig::default();
136 let node_id = PeerId::random();
137 let state = ClientState::new(node_id, &config);
138
139 Ok(Self {
140 backend,
141 config,
142 node_id,
143 state: Arc::new(RwLock::new(state)),
144 initialized: true,
145 })
146 }
147
148 fn ensure_initialized(&self) -> Result<()> {
150 if !self.initialized {
151 return Err(IpfsError::ClientNotInitialized.into());
152 }
153 Ok(())
154 }
155
156 pub async fn is_online(&self) -> bool {
158 self.initialized
159 }
160
161 pub async fn add<R>(&self, data: R) -> Result<AddResponse>
163 where
164 R: tokio::io::AsyncRead + Send + Unpin + 'static,
165 {
166 self.ensure_initialized()?;
167
168 use crate::ipfs_core_api::backends::IpfsBackend;
169 let pinned_data = Pin::new(Box::new(data));
170 self.backend.add(pinned_data).await
171 }
172
173 pub async fn add_bytes(&self, data: Vec<u8>) -> Result<AddResponse> {
175 self.ensure_initialized()?;
176
177 use crate::ipfs_core_api::backends::IpfsBackend;
178
179 struct BytesReader {
181 data: Vec<u8>,
182 pos: usize,
183 }
184
185 impl tokio::io::AsyncRead for BytesReader {
186 fn poll_read(
187 mut self: std::pin::Pin<&mut Self>,
188 _cx: &mut std::task::Context<'_>,
189 buf: &mut tokio::io::ReadBuf<'_>,
190 ) -> std::task::Poll<std::io::Result<()>> {
191 let remaining = self.data.len() - self.pos;
192 let to_read = std::cmp::min(remaining, buf.remaining());
193
194 if to_read == 0 {
195 return std::task::Poll::Ready(Ok(()));
196 }
197
198 buf.put_slice(&self.data[self.pos..self.pos + to_read]);
199 self.pos += to_read;
200
201 std::task::Poll::Ready(Ok(()))
202 }
203 }
204
205 let reader = BytesReader { data, pos: 0 };
206 let pinned_data = Pin::new(Box::new(reader));
207 self.backend.add(pinned_data).await
208 }
209
210 pub async fn cat(&self, path: &str) -> Result<Pin<Box<dyn tokio::io::AsyncRead + Send>>> {
212 self.ensure_initialized()?;
213
214 use crate::ipfs_core_api::backends::IpfsBackend;
215 self.backend.cat(path).await
216 }
217
218 pub async fn dag_get(&self, cid: &Cid, _path: Option<&str>) -> Result<Vec<u8>> {
220 self.ensure_initialized()?;
221
222 debug!("dag_get: cid={}", cid);
223
224 use crate::ipfs_core_api::backends::IpfsBackend;
225 let mut reader = self.backend.cat(&cid.to_string()).await?;
227 let mut data = Vec::new();
228 tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut data).await?;
229 Ok(data)
230 }
231
232 pub async fn dag_put(&self, data: &[u8]) -> Result<Cid> {
234 self.ensure_initialized()?;
235
236 use sha2::{Digest, Sha256};
237
238 let mut hasher = Sha256::new();
240 hasher.update(data);
241 let digest = hasher.finalize();
242
243 let mh = multihash::Multihash::wrap(0x12, &digest)
245 .map_err(|e| GuardianError::Other(format!("Falha ao criar multihash: {}", e)))?; let cid = Cid::new_v1(0x0129, mh); {
252 }
254
255 {
257 let mut state = self.state.write().await;
258 state.repo_stats.num_objects += 1;
259 state.repo_stats.repo_size += data.len() as u64;
260 }
261
262 debug!("Objeto DAG armazenado: {} ({} bytes)", cid, data.len());
263 Ok(cid)
264 }
265
266 pub async fn pubsub_publish(&self, topic: &str, data: &[u8]) -> Result<()> {
268 self.ensure_initialized()?;
269
270 if !self.config.enable_pubsub {
271 return Err(IpfsError::unsupported("PubSub não está habilitado").into());
272 }
273
274 if data.len() > self.config.pubsub.max_message_size {
275 return Err(IpfsError::pubsub(format!(
276 "Mensagem muito grande: {} bytes (máximo: {})",
277 data.len(),
278 self.config.pubsub.max_message_size
279 ))
280 .into());
281 }
282
283 debug!(
284 "Publicando mensagem no tópico '{}': {} bytes",
285 topic,
286 data.len()
287 );
288
289 let message = PubsubMessage::new(self.node_id, topic.to_string(), data.to_vec());
291
292 let state_guard = self.state.read().await;
294 if let Some(sender) = state_guard.subscriptions.get(topic) {
295 if sender.send(message).is_err() {
296 debug!("Nenhum subscriber ativo para tópico '{}'", topic);
297 }
298 } else {
299 debug!("Nenhuma subscrição ativa para tópico '{}'", topic);
300 }
301
302 debug!("Mensagem publicada com sucesso no tópico '{}'", topic);
303 Ok(())
304 }
305
306 pub async fn pubsub_subscribe(&self, topic: &str) -> Result<PubsubStream> {
308 self.ensure_initialized()?;
309
310 if !self.config.enable_pubsub {
311 return Err(IpfsError::unsupported("PubSub não está habilitado").into());
312 }
313
314 debug!("Subscrevendo ao tópico: {}", topic);
315
316 let (sender, mut receiver) = broadcast::channel(self.config.pubsub.message_buffer_size);
318
319 {
321 let mut state_guard = self.state.write().await;
322 state_guard.subscriptions.insert(topic.to_string(), sender);
323 }
324
325 let stream = stream! {
327 while let Ok(msg) = receiver.recv().await {
328 yield Ok(msg);
329 }
330 };
331
332 debug!("Subscrição criada para tópico: {}", topic);
333 Ok(Box::pin(stream))
334 }
335
336 pub async fn pubsub_peers(&self, topic: &str) -> Result<Vec<PeerId>> {
338 self.ensure_initialized()?;
339
340 if !self.config.enable_pubsub {
341 return Err(IpfsError::unsupported("PubSub não está habilitado").into());
342 }
343
344 debug!("Listando peers do tópico: {}", topic);
345
346 match self.backend.get_topic_mesh_peers(topic).await {
348 Ok(mesh_peers) => {
349 debug!(
350 "Encontrados {} peers no mesh do Gossipsub para tópico '{}'",
351 mesh_peers.len(),
352 topic
353 );
354 Ok(mesh_peers)
355 }
356 Err(_) => {
357 warn!("SwarmManager não disponível, usando fallback para peers conectados");
359 let state_guard = self.state.read().await;
360 let connected_peers: Vec<PeerId> = state_guard
361 .connected_peers
362 .values()
363 .filter(|peer| peer.connected)
364 .map(|peer| peer.id)
365 .collect();
366
367 debug!(
368 "Fallback: encontrados {} peers conectados para tópico '{}'",
369 connected_peers.len(),
370 topic
371 );
372 Ok(connected_peers)
373 }
374 }
375 }
376
377 pub async fn pubsub_topics(&self) -> Result<Vec<String>> {
379 self.ensure_initialized()?;
380
381 if !self.config.enable_pubsub {
382 return Err(IpfsError::unsupported("PubSub não está habilitado").into());
383 }
384
385 let state_guard = self.state.read().await;
386 Ok(state_guard.subscriptions.keys().cloned().collect())
387 }
388
389 pub async fn pubsub_unsubscribe(&self, topic: &str) -> Result<()> {
391 self.ensure_initialized()?;
392
393 if !self.config.enable_pubsub {
394 return Err(IpfsError::unsupported("PubSub não está habilitado").into());
395 }
396
397 debug!("Cancelando subscrição do tópico: {}", topic);
398
399 let mut state_guard = self.state.write().await;
401 state_guard.subscriptions.remove(topic);
402
403 debug!("Subscrição cancelada para tópico: {}", topic);
404 Ok(())
405 }
406
407 pub async fn swarm_connect(&self, peer: &PeerId) -> Result<()> {
409 self.ensure_initialized()?;
410
411 if !self.config.enable_swarm {
412 return Err(IpfsError::unsupported("Swarm não está habilitado").into());
413 }
414
415 debug!("Conectando ao peer: {}", peer);
416
417 let mut state_guard = self.state.write().await;
419 state_guard
420 .connected_peers
421 .insert(*peer, PeerInfo::mock(*peer, true));
422
423 info!("Conexão simulada estabelecida com peer: {}", peer);
424 Ok(())
425 }
426
427 pub async fn swarm_peers(&self) -> Result<Vec<PeerInfo>> {
429 self.ensure_initialized()?;
430
431 let state_guard = self.state.read().await;
432 Ok(state_guard.connected_peers.values().cloned().collect())
433 }
434
435 pub async fn id(&self) -> Result<NodeInfo> {
437 self.ensure_initialized()?;
438
439 let state_guard = self.state.read().await;
440 Ok(state_guard.node_info.clone())
441 }
442
443 pub async fn pin_add(&self, hash: &str, recursive: bool) -> Result<PinResponse> {
445 self.ensure_initialized()?;
446
447 debug!("Pinning objeto: {} (recursive: {})", hash, recursive);
448
449 use crate::ipfs_core_api::backends::IpfsBackend;
451 let _ = self.backend.cat(hash).await?; let pin_type = if recursive {
454 PinType::Recursive
455 } else {
456 PinType::Direct
457 };
458
459 {
461 let mut state = self.state.write().await;
462 state
463 .pinned_objects
464 .insert(hash.to_string(), pin_type.clone());
465 }
466
467 debug!("Objeto {} pinned com sucesso", hash);
468 Ok(PinResponse {
469 hash: hash.to_string(),
470 pin_type,
471 })
472 }
473
474 pub async fn pin_rm(&self, hash: &str) -> Result<PinResponse> {
476 self.ensure_initialized()?;
477
478 debug!("Unpinning objeto: {}", hash);
479
480 let mut state = self.state.write().await;
481 let pin_type = state
482 .pinned_objects
483 .remove(hash)
484 .ok_or_else(|| IpfsError::data_not_found(format!("Pin not found for {}", hash)))?;
485
486 debug!("Objeto {} unpinned com sucesso", hash);
487 Ok(PinResponse {
488 hash: hash.to_string(),
489 pin_type,
490 })
491 }
492
493 pub async fn pin_ls(&self, pin_type_filter: Option<PinType>) -> Result<Vec<PinResponse>> {
495 self.ensure_initialized()?;
496
497 let state = self.state.read().await;
498 let pins: Vec<PinResponse> = state
499 .pinned_objects
500 .iter()
501 .filter(|(_, pin_type)| {
502 pin_type_filter
503 .as_ref()
504 .is_none_or(|filter| *pin_type == filter)
505 })
506 .map(|(hash, pin_type)| PinResponse {
507 hash: hash.clone(),
508 pin_type: pin_type.clone(),
509 })
510 .collect();
511
512 debug!("Listando {} objetos pinned", pins.len());
513 Ok(pins)
514 }
515
516 pub async fn repo_stat(&self) -> Result<RepoStats> {
518 self.ensure_initialized()?;
519
520 let state = self.state.read().await;
521 Ok(state.repo_stats.clone())
522 }
523
524 pub fn get_channel_id(&self, other_peer: &PeerId) -> String {
527 let mut channel_id_peers = [self.node_id.to_string(), other_peer.to_string()];
528 channel_id_peers.sort();
529 format!(
530 "/ipfs-pubsub-direct-channel/v1/{}",
531 channel_id_peers.join("/")
532 )
533 }
534
535 pub fn config(&self) -> &ClientConfig {
537 &self.config
538 }
539
540 pub fn node_id(&self) -> PeerId {
542 self.node_id
543 }
544
545 pub async fn shutdown(&self) -> Result<()> {
547 info!("Encerrando cliente IPFS");
548
549 {
551 let mut state = self.state.write().await;
552 state.subscriptions.clear();
553 state.connected_peers.clear();
554 }
555
556 info!("Cliente IPFS encerrado com sucesso");
559 Ok(())
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use std::io::Cursor;
567
568 #[tokio::test]
569 async fn test_client_creation() {
570 let mut config = ClientConfig::development();
571 let timestamp = std::time::SystemTime::now()
572 .duration_since(std::time::UNIX_EPOCH)
573 .unwrap()
574 .as_nanos();
575 config.data_store_path = Some(format!("./tmp/test_creation_{}", timestamp).into());
576 let client = IpfsClient::new(config).await;
577 assert!(client.is_ok());
578 }
579
580 #[tokio::test]
581 async fn test_client_online() {
582 let mut config = ClientConfig::development();
583 let timestamp = std::time::SystemTime::now()
584 .duration_since(std::time::UNIX_EPOCH)
585 .unwrap()
586 .as_nanos();
587 config.data_store_path = Some(format!("./tmp/test_online_{}", timestamp).into());
588 let client = IpfsClient::new(config).await.unwrap();
589 assert!(client.is_online().await);
590 }
591
592 #[tokio::test]
593 #[ignore] async fn test_add_and_cat() {
595 let client = IpfsClient::development().await.unwrap();
596
597 let test_data = "Hello, IPFS Core API!".as_bytes();
598 let cursor = Cursor::new(test_data.to_vec());
599
600 let response = client.add(cursor).await.unwrap();
602 assert!(!response.hash.is_empty());
603 assert_eq!(response.size_bytes().unwrap(), test_data.len());
604
605 let mut stream = client.cat(&response.hash).await.unwrap();
607 let mut buffer = Vec::new();
608 tokio::io::AsyncReadExt::read_to_end(&mut stream, &mut buffer)
609 .await
610 .unwrap();
611
612 assert_eq!(test_data, buffer.as_slice());
613 }
614
615 #[tokio::test]
616 #[ignore] async fn test_dag_operations() {
618 let client = IpfsClient::development().await.unwrap();
619
620 let test_data = b"test dag data";
621 let cid = client.dag_put(test_data).await.unwrap();
622
623 let retrieved_data = client.dag_get(&cid, None).await.unwrap();
624 assert_eq!(retrieved_data, test_data);
625 }
626
627 #[tokio::test]
628 #[ignore] async fn test_pubsub_operations() {
630 let client = IpfsClient::development().await.unwrap();
631
632 let result = client.pubsub_publish("test-topic", b"test message").await;
634 assert!(result.is_ok());
635
636 let topics = client.pubsub_topics().await.unwrap();
638 assert!(topics.is_empty()); let peers = client.pubsub_peers("test-topic").await.unwrap();
642 assert!(peers.is_empty());
643 }
644
645 #[tokio::test]
646 #[ignore] async fn test_pin_operations() {
648 let client = IpfsClient::development().await.unwrap();
649
650 let test_data = "pin test data".as_bytes();
652 let cursor = Cursor::new(test_data.to_vec());
653 let response = client.add(cursor).await.unwrap();
654
655 let pin_response = client.pin_add(&response.hash, true).await.unwrap();
657 assert_eq!(pin_response.hash, response.hash);
658 assert_eq!(pin_response.pin_type, PinType::Recursive);
659
660 let pins = client.pin_ls(None).await.unwrap();
662 assert_eq!(pins.len(), 1);
663 assert_eq!(pins[0].hash, response.hash);
664
665 let rm_response = client.pin_rm(&response.hash).await.unwrap();
667 assert_eq!(rm_response.hash, response.hash);
668
669 let pins_after = client.pin_ls(None).await.unwrap();
671 assert!(pins_after.is_empty());
672 }
673
674 #[tokio::test]
675 #[ignore] async fn test_node_info() {
677 let client = IpfsClient::development().await.unwrap();
678
679 let info = client.id().await.unwrap();
680 assert_eq!(info.id, client.node_id());
681 assert!(info.agent_version.contains("guardian-db"));
682 }
683
684 #[tokio::test]
685 #[ignore] async fn test_channel_id_generation() {
687 let client = IpfsClient::development().await.unwrap();
688 let other_peer = PeerId::random();
689
690 let channel_id = client.get_channel_id(&other_peer);
691 assert!(channel_id.starts_with("/ipfs-pubsub-direct-channel/v1/"));
692
693 let channel_id2 = client.get_channel_id(&other_peer);
695 assert_eq!(channel_id, channel_id2);
696 }
697
698 #[tokio::test]
699 #[ignore] async fn test_error_handling() {
701 let client = IpfsClient::development().await.unwrap();
702
703 let result = client.cat("QmNonExistent").await;
705 assert!(result.is_err());
706
707 let fake_cid: Cid = "bafyreifake123456789012345678901234567890123456789012345"
709 .parse()
710 .unwrap();
711 let result = client.dag_get(&fake_cid, None).await;
712 assert!(result.is_err());
713 }
714
715 #[tokio::test]
716 #[ignore] async fn test_repo_stats() {
718 let client = IpfsClient::development().await.unwrap();
719
720 let initial_stats = client.repo_stat().await.unwrap();
721 assert_eq!(initial_stats.num_objects, 0);
722 assert_eq!(initial_stats.repo_size, 0);
723
724 let test_data = "stats test data".as_bytes();
726 let cursor = Cursor::new(test_data.to_vec());
727 client.add(cursor).await.unwrap();
728
729 let updated_stats = client.repo_stat().await.unwrap();
730 assert_eq!(updated_stats.num_objects, 1);
731 assert_eq!(updated_stats.repo_size, test_data.len() as u64);
732 }
733}