saorsa_node/client/
quantum.rs1use super::chunk_protocol::send_and_await_chunk_response;
20use super::data_types::{DataChunk, XorName};
21use crate::ant_protocol::{
22 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
23 ChunkPutResponse,
24};
25use crate::error::{Error, Result};
26use bytes::Bytes;
27use saorsa_core::P2PNode;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::Arc;
30use std::time::Duration;
31use tracing::{debug, info};
32
33const DEFAULT_TIMEOUT_SECS: u64 = 30;
35
36const DEFAULT_REPLICA_COUNT: u8 = 4;
38
39#[derive(Debug, Clone)]
41pub struct QuantumConfig {
42 pub timeout_secs: u64,
44 pub replica_count: u8,
46 pub encrypt_data: bool,
48}
49
50impl Default for QuantumConfig {
51 fn default() -> Self {
52 Self {
53 timeout_secs: DEFAULT_TIMEOUT_SECS,
54 replica_count: DEFAULT_REPLICA_COUNT,
55 encrypt_data: true,
56 }
57 }
58}
59
60pub struct QuantumClient {
73 config: QuantumConfig,
74 p2p_node: Option<Arc<P2PNode>>,
75 next_request_id: AtomicU64,
76}
77
78impl QuantumClient {
79 #[must_use]
81 pub fn new(config: QuantumConfig) -> Self {
82 debug!("Creating quantum-resistant saorsa client");
83 Self {
84 config,
85 p2p_node: None,
86 next_request_id: AtomicU64::new(1),
87 }
88 }
89
90 #[must_use]
92 pub fn with_defaults() -> Self {
93 Self::new(QuantumConfig::default())
94 }
95
96 #[must_use]
98 pub fn with_node(mut self, node: Arc<P2PNode>) -> Self {
99 self.p2p_node = Some(node);
100 self
101 }
102
103 pub async fn get_chunk(&self, address: &XorName) -> Result<Option<DataChunk>> {
120 debug!(
121 "Querying saorsa network for chunk: {}",
122 hex::encode(address)
123 );
124
125 let Some(ref node) = self.p2p_node else {
126 return Err(Error::Network("P2P node not configured".into()));
127 };
128
129 let target_peer = Self::pick_target_peer(node).await?;
130
131 let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
133 let request = ChunkGetRequest::new(*address);
134 let message = ChunkMessage {
135 request_id,
136 body: ChunkMessageBody::GetRequest(request),
137 };
138 let message_bytes = message
139 .encode()
140 .map_err(|e| Error::Network(format!("Failed to encode GET request: {e}")))?;
141
142 let timeout = Duration::from_secs(self.config.timeout_secs);
143 let addr_hex = hex::encode(address);
144 let timeout_secs = self.config.timeout_secs;
145
146 send_and_await_chunk_response(
147 node,
148 &target_peer,
149 message_bytes,
150 request_id,
151 timeout,
152 |body| match body {
153 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
154 address: addr,
155 content,
156 }) => {
157 debug!(
158 "Found chunk {} on saorsa network ({} bytes)",
159 hex::encode(addr),
160 content.len()
161 );
162 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
163 }
164 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => {
165 debug!("Chunk {} not found on saorsa network", addr_hex);
166 Some(Ok(None))
167 }
168 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
169 Error::Network(format!("Remote GET error for {addr_hex}: {e}")),
170 )),
171 _ => None,
172 },
173 |e| Error::Network(format!("Failed to send GET to peer {target_peer}: {e}")),
174 || {
175 Error::Network(format!(
176 "Timeout waiting for chunk {addr_hex} after {timeout_secs}s"
177 ))
178 },
179 )
180 .await
181 }
182
183 pub async fn put_chunk(&self, content: Bytes) -> Result<XorName> {
201 debug!("Storing chunk on saorsa network ({} bytes)", content.len());
202
203 let Some(ref node) = self.p2p_node else {
204 return Err(Error::Network("P2P node not configured".into()));
205 };
206
207 let target_peer = Self::pick_target_peer(node).await?;
208
209 let address = crate::client::compute_address(&content);
211
212 let empty_payment = rmp_serde::to_vec(&ant_evm::ProofOfPayment {
214 peer_quotes: vec![],
215 })
216 .map_err(|e| Error::Network(format!("Failed to serialize payment proof: {e}")))?;
217
218 let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
219 let request = ChunkPutRequest::with_payment(address, content.to_vec(), empty_payment);
220 let message = ChunkMessage {
221 request_id,
222 body: ChunkMessageBody::PutRequest(request),
223 };
224 let message_bytes = message
225 .encode()
226 .map_err(|e| Error::Network(format!("Failed to encode PUT request: {e}")))?;
227
228 let timeout = Duration::from_secs(self.config.timeout_secs);
229 let content_len = content.len();
230 let addr_hex = hex::encode(address);
231 let timeout_secs = self.config.timeout_secs;
232
233 send_and_await_chunk_response(
234 node,
235 &target_peer,
236 message_bytes,
237 request_id,
238 timeout,
239 |body| match body {
240 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
241 info!(
242 "Chunk stored at address: {} ({} bytes)",
243 hex::encode(addr),
244 content_len
245 );
246 Some(Ok(addr))
247 }
248 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
249 address: addr,
250 }) => {
251 info!("Chunk already exists at address: {}", hex::encode(addr));
252 Some(Ok(addr))
253 }
254 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
255 Some(Err(Error::Network(format!("Payment required: {message}"))))
256 }
257 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
258 Error::Network(format!("Remote PUT error for {addr_hex}: {e}")),
259 )),
260 _ => None,
261 },
262 |e| Error::Network(format!("Failed to send PUT to peer {target_peer}: {e}")),
263 || {
264 Error::Network(format!(
265 "Timeout waiting for store response for {addr_hex} after {timeout_secs}s"
266 ))
267 },
268 )
269 .await
270 }
271
272 pub async fn exists(&self, address: &XorName) -> Result<bool> {
289 debug!(
290 "Checking existence on saorsa network: {}",
291 hex::encode(address)
292 );
293 self.get_chunk(address).await.map(|opt| opt.is_some())
294 }
295
296 async fn pick_target_peer(node: &P2PNode) -> Result<String> {
298 let peers = node.connected_peers().await;
299 peers
300 .into_iter()
301 .next()
302 .ok_or_else(|| Error::Network("No connected peers available".into()))
303 }
304}
305
306#[cfg(test)]
307#[allow(clippy::unwrap_used, clippy::expect_used)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_quantum_config_default() {
313 let config = QuantumConfig::default();
314 assert_eq!(config.timeout_secs, DEFAULT_TIMEOUT_SECS);
315 assert_eq!(config.replica_count, DEFAULT_REPLICA_COUNT);
316 assert!(config.encrypt_data);
317 }
318
319 #[test]
320 fn test_quantum_client_creation() {
321 let client = QuantumClient::with_defaults();
322 assert_eq!(client.config.timeout_secs, DEFAULT_TIMEOUT_SECS);
323 assert!(client.p2p_node.is_none());
324 }
325
326 #[tokio::test]
327 async fn test_get_chunk_without_node_fails() {
328 let client = QuantumClient::with_defaults();
329 let address = [0; 32];
330
331 let result = client.get_chunk(&address).await;
332 assert!(result.is_err());
333 }
334
335 #[tokio::test]
336 async fn test_put_chunk_without_node_fails() {
337 let client = QuantumClient::with_defaults();
338 let content = Bytes::from("test data");
339
340 let result = client.put_chunk(content).await;
341 assert!(result.is_err());
342 }
343
344 #[tokio::test]
345 async fn test_exists_without_node_fails() {
346 let client = QuantumClient::with_defaults();
347 let address = [0; 32];
348
349 let result = client.exists(&address).await;
350 assert!(result.is_err());
351 }
352}