ipfrs_storage/
otel.rs

1//! OpenTelemetry integration for distributed tracing
2//!
3//! This module provides OpenTelemetry tracing integration for storage operations.
4//! It allows tracking requests across distributed systems and analyzing performance.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use ipfrs_storage::{OtelBlockStore, MemoryBlockStore, BlockStoreTrait};
10//! use ipfrs_core::Block;
11//!
12//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let store = MemoryBlockStore::new();
14//! let traced_store = OtelBlockStore::new(store, "storage_node_1".to_string());
15//!
16//! // Operations are now automatically traced
17//! let block = Block::new(b"hello".to_vec().into())?;
18//! traced_store.put(&block).await?;
19//! # Ok(())
20//! # }
21//! ```
22
23use crate::traits::BlockStore;
24use async_trait::async_trait;
25use ipfrs_core::{Block, Cid, Result};
26use std::sync::Arc;
27use std::time::Instant;
28use tracing::{debug, error, info_span, warn, Instrument};
29
30/// OpenTelemetry-instrumented BlockStore wrapper
31///
32/// This wrapper adds distributed tracing spans to all storage operations,
33/// making it easy to track performance and debug issues in distributed systems.
34#[derive(Clone)]
35pub struct OtelBlockStore<S> {
36    inner: Arc<S>,
37    service_name: String,
38}
39
40impl<S: BlockStore> OtelBlockStore<S> {
41    /// Create a new OpenTelemetry-instrumented block store
42    pub fn new(store: S, service_name: String) -> Self {
43        Self {
44            inner: Arc::new(store),
45            service_name,
46        }
47    }
48
49    /// Get reference to the inner store
50    pub fn inner(&self) -> &S {
51        &self.inner
52    }
53
54    /// Extract span attributes from a CID
55    #[allow(dead_code)]
56    fn cid_attributes(cid: &Cid) -> Vec<(&'static str, String)> {
57        vec![
58            ("cid", cid.to_string()),
59            ("cid.version", format!("{:?}", cid.version())),
60            ("cid.codec", format!("{}", cid.codec())),
61        ]
62    }
63}
64
65#[async_trait]
66impl<S: BlockStore + Send + Sync + 'static> BlockStore for OtelBlockStore<S> {
67    async fn put(&self, block: &Block) -> Result<()> {
68        let cid = block.cid();
69        let start = Instant::now();
70
71        let span = info_span!(
72            "blockstore.put",
73            service.name = %self.service_name,
74            cid = %cid,
75            block.size = block.data().len(),
76        );
77
78        let result = self.inner.put(block).instrument(span).await;
79
80        let duration_us = start.elapsed().as_micros();
81        match &result {
82            Ok(_) => {
83                debug!(
84                    cid = %cid,
85                    duration_us = duration_us,
86                    "Block put succeeded"
87                );
88            }
89            Err(e) => {
90                error!(
91                    cid = %cid,
92                    duration_us = duration_us,
93                    error = %e,
94                    "Block put failed"
95                );
96            }
97        }
98
99        result
100    }
101
102    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
103        let start = Instant::now();
104
105        let span = info_span!(
106            "blockstore.get",
107            service.name = %self.service_name,
108            cid = %cid,
109        );
110
111        let result = self.inner.get(cid).instrument(span).await;
112
113        let duration_us = start.elapsed().as_micros();
114        match &result {
115            Ok(Some(block)) => {
116                debug!(
117                    cid = %cid,
118                    duration_us = duration_us,
119                    block.size = block.data().len(),
120                    "Block get succeeded (hit)"
121                );
122            }
123            Ok(None) => {
124                debug!(
125                    cid = %cid,
126                    duration_us = duration_us,
127                    "Block get succeeded (miss)"
128                );
129            }
130            Err(e) => {
131                error!(
132                    cid = %cid,
133                    duration_us = duration_us,
134                    error = %e,
135                    "Block get failed"
136                );
137            }
138        }
139
140        result
141    }
142
143    async fn has(&self, cid: &Cid) -> Result<bool> {
144        let start = Instant::now();
145
146        let span = info_span!(
147            "blockstore.has",
148            service.name = %self.service_name,
149            cid = %cid,
150        );
151
152        let result = self.inner.has(cid).instrument(span).await;
153
154        let duration_us = start.elapsed().as_micros();
155        match &result {
156            Ok(exists) => {
157                debug!(
158                    cid = %cid,
159                    duration_us = duration_us,
160                    exists = exists,
161                    "Block has check succeeded"
162                );
163            }
164            Err(e) => {
165                error!(
166                    cid = %cid,
167                    duration_us = duration_us,
168                    error = %e,
169                    "Block has check failed"
170                );
171            }
172        }
173
174        result
175    }
176
177    async fn delete(&self, cid: &Cid) -> Result<()> {
178        let start = Instant::now();
179
180        let span = info_span!(
181            "blockstore.delete",
182            service.name = %self.service_name,
183            cid = %cid,
184        );
185
186        let result = self.inner.delete(cid).instrument(span).await;
187
188        let duration_us = start.elapsed().as_micros();
189        match &result {
190            Ok(_) => {
191                debug!(
192                    cid = %cid,
193                    duration_us = duration_us,
194                    "Block delete succeeded"
195                );
196            }
197            Err(e) => {
198                error!(
199                    cid = %cid,
200                    duration_us = duration_us,
201                    error = %e,
202                    "Block delete failed"
203                );
204            }
205        }
206
207        result
208    }
209
210    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
211        let start = Instant::now();
212        let total_size: usize = blocks.iter().map(|b| b.data().len()).sum();
213
214        let span = info_span!(
215            "blockstore.put_many",
216            service.name = %self.service_name,
217            blocks.count = blocks.len(),
218            blocks.total_size = total_size,
219        );
220
221        let result = self.inner.put_many(blocks).instrument(span).await;
222
223        let duration_us = start.elapsed().as_micros();
224        match &result {
225            Ok(_) => {
226                debug!(
227                    blocks.count = blocks.len(),
228                    duration_us = duration_us,
229                    throughput_mbps = (total_size as f64 / duration_us as f64) * 1000.0,
230                    "Batch put succeeded"
231                );
232            }
233            Err(e) => {
234                error!(
235                    blocks.count = blocks.len(),
236                    duration_us = duration_us,
237                    error = %e,
238                    "Batch put failed"
239                );
240            }
241        }
242
243        result
244    }
245
246    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
247        let start = Instant::now();
248
249        let span = info_span!(
250            "blockstore.get_many",
251            service.name = %self.service_name,
252            cids.count = cids.len(),
253        );
254
255        let result = self.inner.get_many(cids).instrument(span).await;
256
257        let duration_us = start.elapsed().as_micros();
258        match &result {
259            Ok(blocks) => {
260                let hits = blocks.iter().filter(|b| b.is_some()).count();
261                let total_size: usize = blocks
262                    .iter()
263                    .filter_map(|b| b.as_ref())
264                    .map(|b| b.data().len())
265                    .sum();
266
267                debug!(
268                    cids.count = cids.len(),
269                    hits = hits,
270                    misses = cids.len() - hits,
271                    duration_us = duration_us,
272                    total_size = total_size,
273                    "Batch get succeeded"
274                );
275            }
276            Err(e) => {
277                error!(
278                    cids.count = cids.len(),
279                    duration_us = duration_us,
280                    error = %e,
281                    "Batch get failed"
282                );
283            }
284        }
285
286        result
287    }
288
289    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
290        let start = Instant::now();
291
292        let span = info_span!(
293            "blockstore.has_many",
294            service.name = %self.service_name,
295            cids.count = cids.len(),
296        );
297
298        let result = self.inner.has_many(cids).instrument(span).await;
299
300        let duration_us = start.elapsed().as_micros();
301        match &result {
302            Ok(results) => {
303                let exists_count = results.iter().filter(|&&b| b).count();
304                debug!(
305                    cids.count = cids.len(),
306                    exists = exists_count,
307                    not_exists = cids.len() - exists_count,
308                    duration_us = duration_us,
309                    "Batch has check succeeded"
310                );
311            }
312            Err(e) => {
313                error!(
314                    cids.count = cids.len(),
315                    duration_us = duration_us,
316                    error = %e,
317                    "Batch has check failed"
318                );
319            }
320        }
321
322        result
323    }
324
325    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
326        let start = Instant::now();
327
328        let span = info_span!(
329            "blockstore.delete_many",
330            service.name = %self.service_name,
331            cids.count = cids.len(),
332        );
333
334        let result = self.inner.delete_many(cids).instrument(span).await;
335
336        let duration_us = start.elapsed().as_micros();
337        match &result {
338            Ok(_) => {
339                debug!(
340                    cids.count = cids.len(),
341                    duration_us = duration_us,
342                    "Batch delete succeeded"
343                );
344            }
345            Err(e) => {
346                error!(
347                    cids.count = cids.len(),
348                    duration_us = duration_us,
349                    error = %e,
350                    "Batch delete failed"
351                );
352            }
353        }
354
355        result
356    }
357
358    async fn flush(&self) -> Result<()> {
359        let start = Instant::now();
360
361        let span = info_span!(
362            "blockstore.flush",
363            service.name = %self.service_name,
364        );
365
366        let result = self.inner.flush().instrument(span).await;
367
368        let duration_us = start.elapsed().as_micros();
369        match &result {
370            Ok(_) => {
371                debug!(duration_us = duration_us, "Flush succeeded");
372            }
373            Err(e) => {
374                warn!(
375                    duration_us = duration_us,
376                    error = %e,
377                    "Flush failed"
378                );
379            }
380        }
381
382        result
383    }
384
385    fn list_cids(&self) -> Result<Vec<Cid>> {
386        let start = Instant::now();
387
388        let _span = info_span!(
389            "blockstore.list_cids",
390            service.name = %self.service_name,
391        );
392
393        let result = self.inner.list_cids();
394
395        let duration_us = start.elapsed().as_micros();
396        match &result {
397            Ok(cids) => {
398                debug!(
399                    cids.count = cids.len(),
400                    duration_us = duration_us,
401                    "List CIDs succeeded"
402                );
403            }
404            Err(e) => {
405                error!(
406                    duration_us = duration_us,
407                    error = %e,
408                    "List CIDs failed"
409                );
410            }
411        }
412
413        result
414    }
415
416    fn len(&self) -> usize {
417        self.inner.len()
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::memory::MemoryBlockStore;
425
426    #[tokio::test]
427    async fn test_otel_put_get() {
428        let store = MemoryBlockStore::new();
429        let traced = OtelBlockStore::new(store, "test_node".to_string());
430
431        let block = Block::new(b"hello world".to_vec().into()).unwrap();
432        let cid = block.cid();
433
434        traced.put(&block).await.unwrap();
435        let retrieved = traced.get(cid).await.unwrap();
436        assert!(retrieved.is_some());
437        assert_eq!(retrieved.unwrap().data(), block.data());
438    }
439
440    #[tokio::test]
441    async fn test_otel_has_delete() {
442        let store = MemoryBlockStore::new();
443        let traced = OtelBlockStore::new(store, "test_node".to_string());
444
445        let block = Block::new(b"test data".to_vec().into()).unwrap();
446        let cid = block.cid();
447
448        traced.put(&block).await.unwrap();
449        assert!(traced.has(cid).await.unwrap());
450
451        traced.delete(cid).await.unwrap();
452        assert!(!traced.has(cid).await.unwrap());
453    }
454
455    #[tokio::test]
456    async fn test_otel_batch_operations() {
457        let store = MemoryBlockStore::new();
458        let traced = OtelBlockStore::new(store, "test_node".to_string());
459
460        let blocks = vec![
461            Block::new(b"block1".to_vec().into()).unwrap(),
462            Block::new(b"block2".to_vec().into()).unwrap(),
463            Block::new(b"block3".to_vec().into()).unwrap(),
464        ];
465        let cids: Vec<Cid> = blocks.iter().map(|b| b.cid().clone()).collect();
466
467        traced.put_many(&blocks).await.unwrap();
468
469        let has_results = traced.has_many(&cids).await.unwrap();
470        assert_eq!(has_results.len(), 3);
471        assert!(has_results.iter().all(|&b| b));
472
473        let get_results = traced.get_many(&cids).await.unwrap();
474        assert_eq!(get_results.len(), 3);
475        assert!(get_results.iter().all(|b| b.is_some()));
476    }
477
478    #[tokio::test]
479    async fn test_otel_inner_access() {
480        let store = MemoryBlockStore::new();
481        let traced = OtelBlockStore::new(store, "test_node".to_string());
482
483        // Can access inner store
484        let block = Block::new(b"direct access".to_vec().into()).unwrap();
485        traced.inner().put(&block).await.unwrap();
486
487        // Visible through traced wrapper
488        assert!(traced.has(block.cid()).await.unwrap());
489    }
490}