rhizome_p2p/
api.rs

1//! Rhizome P2P
2//!
3//! Rhizome is a high—performance, decentralized P2P messaging library implemented on Rust.
4//! It is based on the Kademlia DHT protocol with custom data replication and
5//! content ranking mechanisms.
6//!
7//! ## Features
8//! - 🦀 Rust Core: Maximum performance and memory security without GC.
9//! - 🔒 Anonymity: DHT-based routing hides direct connections between network participants.
10//! - ⚡ Async First: A fully asynchronous stack based on tokio and futures.
11//! - 🔄 Smart replication: Automatic distribution of data to k-nearest nodes.
12//! - 📈 Popularity system: Content in demand gets storage priority and a higher TTL.
13//! - 📦 Modularity: You can use it as a ready-made CLI node, or connect it as a library (cargo lib) to your project.
14
15uniffi::setup_scaffolding!("rhizome_p2p");
16
17/// Configuration Module
18pub mod config;
19/// Rhizome Exceptions Module
20pub mod exceptions;
21/// Module for logging and registration of events
22pub mod logger;
23
24/// Kademlia DHT realization
25pub mod dht;
26/// Realization of network working on more low transport level
27pub mod network;
28/// Module for work with nodes: types of nodes and their main functions
29pub mod node;
30/// Module for work with exchange of popular data and analyze metrics for this data
31pub mod popularity;
32/// Need for data copying to other nodes in network
33pub mod replication;
34/// Security module for create network more stable
35pub mod security;
36/// Local storage in node for fast data choosing
37pub mod storage;
38/// Support module for uniffi (need to build mobile versions)
39mod uniffi_bindgen;
40/// Some help functional for work with serialization and crypto
41pub mod utils;
42
43use std::path::PathBuf;
44use std::sync::Arc;
45use tokio::sync::RwLock;
46use tokio::time::{Duration, sleep};
47
48use crate::config::Config;
49use crate::exceptions::{DHTError, NetworkError, RhizomeError};
50use crate::node::full_node::FullNode;
51use crate::storage::keys::KeyManager;
52use crate::utils::crypto::hash_key;
53use crate::utils::serialization::{deserialize, serialize};
54use crate::utils::time::get_now_i64;
55
56#[derive(uniffi::Record, serde::Serialize, serde::Deserialize, Clone, Debug)]
57pub struct ThreadMetadataBridge {
58    pub id: String,
59    pub title: String,
60    pub created_at: i64,
61    pub creator_pubkey: String,
62    pub category: Option<String>,
63    pub tags: Vec<String>,
64    pub message_count: i32,
65    pub last_activity: i64,
66    pub popularity_score: f64,
67}
68
69#[derive(uniffi::Record, serde::Serialize, serde::Deserialize, Clone, Debug)]
70pub struct MessageBridge {
71    pub id: String,
72    pub thread_id: String,
73    pub parent_id: Option<String>,
74    pub content: String,
75    pub author_signature: Option<String>,
76    pub timestamp: i64,
77    pub content_type: String,
78    pub attachments: Vec<String>,
79}
80
81#[derive(uniffi::Object)]
82pub struct RhizomeClient {
83    // Оборачиваем внутреннее состояние для возможности работы через &self
84    inner: Arc<RwLock<ClientInner>>,
85}
86
87struct ClientInner {
88    pub config: Config,
89    pub node: Option<Arc<FullNode>>,
90    pub key_manager: KeyManager,
91    pub is_running: bool,
92}
93
94/// API client for work with protocol
95#[uniffi::export]
96impl RhizomeClient {
97    #[uniffi::constructor]
98    pub fn new(config_path: Option<String>) -> Arc<Self> {
99        let final_config = if let Some(path) = config_path {
100            Config::from_file(Some(PathBuf::from(path)))
101        } else {
102            let default_path = PathBuf::from("config.yaml");
103            if default_path.exists() {
104                Config::from_file(Some(default_path))
105            } else {
106                Config::from_file(None)
107            }
108        };
109
110        Arc::new(Self {
111            inner: Arc::new(RwLock::new(ClientInner {
112                config: final_config,
113                node: None,
114                key_manager: KeyManager::new(),
115                is_running: false,
116            })),
117        })
118    }
119
120    pub async fn start(&self) -> Result<(), RhizomeError> {
121        let mut inner = self.inner.write().await;
122        if inner.is_running {
123            return Err(RhizomeError::Network(NetworkError::General));
124        }
125
126        let node = FullNode::new(inner.config.clone())
127            .await
128            .map_err(|_| RhizomeError::Dht(DHTError::General))?;
129
130        let node_arc = Arc::new(node);
131        node_arc
132            .start()
133            .await
134            .map_err(|_| RhizomeError::Network(NetworkError::General))?;
135
136        inner.node = Some(node_arc);
137        inner.is_running = true;
138
139        sleep(Duration::from_secs(1)).await;
140        Ok(())
141    }
142
143    pub async fn stop(&self) -> Result<(), RhizomeError> {
144        let mut inner = self.inner.write().await;
145        if let Some(node) = inner.node.take()
146            && inner.is_running
147        {
148            node.stop()
149                .await
150                .map_err(|_| RhizomeError::Network(NetworkError::General))?;
151            inner.is_running = false;
152        }
153        Ok(())
154    }
155
156    pub async fn create_thread(
157        &self,
158        thread_id: String,
159        title: String,
160        category: Option<String>,
161        tags: Option<Vec<String>>,
162        creator_pubkey: Option<String>,
163        ttl: i32,
164    ) -> Result<ThreadMetadataBridge, RhizomeError> {
165        let inner = self.inner.read().await;
166        let node = inner
167            .node
168            .as_ref()
169            .ok_or(RhizomeError::Dht(DHTError::NodeNotFound))?;
170
171        let creator = creator_pubkey
172            .unwrap_or_else(|| format!("0x{}", hex::encode(&hash_key(thread_id.as_bytes())[..8])));
173
174        let thread_meta = ThreadMetadataBridge {
175            id: thread_id.clone(),
176            title,
177            created_at: get_now_i64(),
178            creator_pubkey: creator,
179            category,
180            tags: tags.unwrap_or_default(),
181            message_count: 0,
182            last_activity: get_now_i64(),
183            popularity_score: 0.0,
184        };
185
186        let meta_key = inner.key_manager.get_thread_meta_key(&thread_id);
187        let meta_data =
188            serialize(&thread_meta, "msgpack").map_err(|_| RhizomeError::Dht(DHTError::General))?;
189        node.store(&meta_key, &meta_data, ttl).await?;
190
191        // Обновление индекса
192        let threads_key = inner.key_manager.get_global_threads_key();
193        let mut thread_list: Vec<String> = match node.find_value(&threads_key).await {
194            Ok(data) => deserialize(&data, "msgpack").unwrap_or_default(),
195            Err(_) => Vec::new(),
196        };
197
198        if !thread_list.contains(&thread_id) {
199            thread_list.push(thread_id);
200            let list_data = serialize(&thread_list, "msgpack")
201                .map_err(|_| RhizomeError::Dht(DHTError::General))?;
202            node.store(&threads_key, &list_data, 86400).await?;
203        }
204
205        Ok(thread_meta)
206    }
207
208    pub async fn add_message(
209        &self,
210        thread_id: String,
211        content: String,
212        author_signature: Option<String>,
213        parent_id: Option<String>,
214        content_type: String,
215        ttl: i32,
216    ) -> Result<MessageBridge, RhizomeError> {
217        let inner = self.inner.read().await;
218        let node = inner
219            .node
220            .as_ref()
221            .ok_or(RhizomeError::Dht(DHTError::NodeNotFound))?;
222
223        let timestamp = get_now_i64();
224        let message_id = format!("msg_{}_{}", thread_id, timestamp);
225
226        let signature = author_signature.unwrap_or_else(|| {
227            format!("sig_{}", hex::encode(&hash_key(message_id.as_bytes())[..8]))
228        });
229
230        let message = MessageBridge {
231            id: message_id.clone(),
232            thread_id: thread_id.clone(),
233            parent_id,
234            content,
235            author_signature: Some(signature),
236            timestamp,
237            content_type,
238            attachments: vec![],
239        };
240
241        let message_hash = hex::encode(&hash_key(message_id.as_bytes())[..8]);
242        let message_key = inner.key_manager.get_message_key(&message_hash);
243        let message_data =
244            serialize(&message, "msgpack").map_err(|_| RhizomeError::Dht(DHTError::General))?;
245
246        node.store(&message_key, &message_data, ttl).await?;
247
248        // Здесь мы бы вызвали update_thread, но для краткости опустим (логика аналогична)
249        Ok(message)
250    }
251
252    // Для API используем String (JSON), так как UniFFI не поддерживает динамический Value
253    pub async fn get_popular_threads_json(&self, limit: u32) -> Result<String, RhizomeError> {
254        let inner = self.inner.read().await;
255        let node = inner
256            .node
257            .as_ref()
258            .ok_or(RhizomeError::Dht(DHTError::NodeNotFound))?;
259
260        let all_metrics = node
261            .metrics_collector
262            .read()
263            .await
264            .get_all_metrics()
265            .clone();
266        let ranked = node
267            .popularity_ranker
268            .rank_items(&all_metrics, Some(limit as usize));
269
270        let result = serde_json::json!(
271            ranked
272                .iter()
273                .map(|item| {
274                    serde_json::json!({
275                        "key": hex::encode(&item.key),
276                        "score": item.score
277                    })
278                })
279                .collect::<Vec<_>>()
280        );
281
282        Ok(result.to_string())
283    }
284
285    pub async fn get_node_info_json(&self) -> String {
286        let inner = self.inner.read().await;
287        match &inner.node {
288            Some(node) => {
289                serde_json::json!({
290                    "node_id": hex::encode(node.node_id.0),
291                    "node_type": format!("{:?}", node.node_type),
292                    "is_running": inner.is_running,
293                    "address": format!("{}:{}", inner.config.network.listen_host, inner.config.network.listen_port),
294                }).to_string()
295            }
296            None => serde_json::json!({"status": "not_initialized"}).to_string(),
297        }
298    }
299}