1use crate::traits::BlockStore;
24use async_trait::async_trait;
25use ipfrs_core::{Block, Cid, Result};
26use std::sync::Arc;
27use std::time::Instant;
28use tracing::{debug, error, info_span, warn, Instrument};
29
30#[derive(Clone)]
35pub struct OtelBlockStore<S> {
36 inner: Arc<S>,
37 service_name: String,
38}
39
40impl<S: BlockStore> OtelBlockStore<S> {
41 pub fn new(store: S, service_name: String) -> Self {
43 Self {
44 inner: Arc::new(store),
45 service_name,
46 }
47 }
48
49 pub fn inner(&self) -> &S {
51 &self.inner
52 }
53
54 #[allow(dead_code)]
56 fn cid_attributes(cid: &Cid) -> Vec<(&'static str, String)> {
57 vec![
58 ("cid", cid.to_string()),
59 ("cid.version", format!("{:?}", cid.version())),
60 ("cid.codec", format!("{}", cid.codec())),
61 ]
62 }
63}
64
65#[async_trait]
66impl<S: BlockStore + Send + Sync + 'static> BlockStore for OtelBlockStore<S> {
67 async fn put(&self, block: &Block) -> Result<()> {
68 let cid = block.cid();
69 let start = Instant::now();
70
71 let span = info_span!(
72 "blockstore.put",
73 service.name = %self.service_name,
74 cid = %cid,
75 block.size = block.data().len(),
76 );
77
78 let result = self.inner.put(block).instrument(span).await;
79
80 let duration_us = start.elapsed().as_micros();
81 match &result {
82 Ok(_) => {
83 debug!(
84 cid = %cid,
85 duration_us = duration_us,
86 "Block put succeeded"
87 );
88 }
89 Err(e) => {
90 error!(
91 cid = %cid,
92 duration_us = duration_us,
93 error = %e,
94 "Block put failed"
95 );
96 }
97 }
98
99 result
100 }
101
102 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
103 let start = Instant::now();
104
105 let span = info_span!(
106 "blockstore.get",
107 service.name = %self.service_name,
108 cid = %cid,
109 );
110
111 let result = self.inner.get(cid).instrument(span).await;
112
113 let duration_us = start.elapsed().as_micros();
114 match &result {
115 Ok(Some(block)) => {
116 debug!(
117 cid = %cid,
118 duration_us = duration_us,
119 block.size = block.data().len(),
120 "Block get succeeded (hit)"
121 );
122 }
123 Ok(None) => {
124 debug!(
125 cid = %cid,
126 duration_us = duration_us,
127 "Block get succeeded (miss)"
128 );
129 }
130 Err(e) => {
131 error!(
132 cid = %cid,
133 duration_us = duration_us,
134 error = %e,
135 "Block get failed"
136 );
137 }
138 }
139
140 result
141 }
142
143 async fn has(&self, cid: &Cid) -> Result<bool> {
144 let start = Instant::now();
145
146 let span = info_span!(
147 "blockstore.has",
148 service.name = %self.service_name,
149 cid = %cid,
150 );
151
152 let result = self.inner.has(cid).instrument(span).await;
153
154 let duration_us = start.elapsed().as_micros();
155 match &result {
156 Ok(exists) => {
157 debug!(
158 cid = %cid,
159 duration_us = duration_us,
160 exists = exists,
161 "Block has check succeeded"
162 );
163 }
164 Err(e) => {
165 error!(
166 cid = %cid,
167 duration_us = duration_us,
168 error = %e,
169 "Block has check failed"
170 );
171 }
172 }
173
174 result
175 }
176
177 async fn delete(&self, cid: &Cid) -> Result<()> {
178 let start = Instant::now();
179
180 let span = info_span!(
181 "blockstore.delete",
182 service.name = %self.service_name,
183 cid = %cid,
184 );
185
186 let result = self.inner.delete(cid).instrument(span).await;
187
188 let duration_us = start.elapsed().as_micros();
189 match &result {
190 Ok(_) => {
191 debug!(
192 cid = %cid,
193 duration_us = duration_us,
194 "Block delete succeeded"
195 );
196 }
197 Err(e) => {
198 error!(
199 cid = %cid,
200 duration_us = duration_us,
201 error = %e,
202 "Block delete failed"
203 );
204 }
205 }
206
207 result
208 }
209
210 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
211 let start = Instant::now();
212 let total_size: usize = blocks.iter().map(|b| b.data().len()).sum();
213
214 let span = info_span!(
215 "blockstore.put_many",
216 service.name = %self.service_name,
217 blocks.count = blocks.len(),
218 blocks.total_size = total_size,
219 );
220
221 let result = self.inner.put_many(blocks).instrument(span).await;
222
223 let duration_us = start.elapsed().as_micros();
224 match &result {
225 Ok(_) => {
226 debug!(
227 blocks.count = blocks.len(),
228 duration_us = duration_us,
229 throughput_mbps = (total_size as f64 / duration_us as f64) * 1000.0,
230 "Batch put succeeded"
231 );
232 }
233 Err(e) => {
234 error!(
235 blocks.count = blocks.len(),
236 duration_us = duration_us,
237 error = %e,
238 "Batch put failed"
239 );
240 }
241 }
242
243 result
244 }
245
246 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
247 let start = Instant::now();
248
249 let span = info_span!(
250 "blockstore.get_many",
251 service.name = %self.service_name,
252 cids.count = cids.len(),
253 );
254
255 let result = self.inner.get_many(cids).instrument(span).await;
256
257 let duration_us = start.elapsed().as_micros();
258 match &result {
259 Ok(blocks) => {
260 let hits = blocks.iter().filter(|b| b.is_some()).count();
261 let total_size: usize = blocks
262 .iter()
263 .filter_map(|b| b.as_ref())
264 .map(|b| b.data().len())
265 .sum();
266
267 debug!(
268 cids.count = cids.len(),
269 hits = hits,
270 misses = cids.len() - hits,
271 duration_us = duration_us,
272 total_size = total_size,
273 "Batch get succeeded"
274 );
275 }
276 Err(e) => {
277 error!(
278 cids.count = cids.len(),
279 duration_us = duration_us,
280 error = %e,
281 "Batch get failed"
282 );
283 }
284 }
285
286 result
287 }
288
289 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
290 let start = Instant::now();
291
292 let span = info_span!(
293 "blockstore.has_many",
294 service.name = %self.service_name,
295 cids.count = cids.len(),
296 );
297
298 let result = self.inner.has_many(cids).instrument(span).await;
299
300 let duration_us = start.elapsed().as_micros();
301 match &result {
302 Ok(results) => {
303 let exists_count = results.iter().filter(|&&b| b).count();
304 debug!(
305 cids.count = cids.len(),
306 exists = exists_count,
307 not_exists = cids.len() - exists_count,
308 duration_us = duration_us,
309 "Batch has check succeeded"
310 );
311 }
312 Err(e) => {
313 error!(
314 cids.count = cids.len(),
315 duration_us = duration_us,
316 error = %e,
317 "Batch has check failed"
318 );
319 }
320 }
321
322 result
323 }
324
325 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
326 let start = Instant::now();
327
328 let span = info_span!(
329 "blockstore.delete_many",
330 service.name = %self.service_name,
331 cids.count = cids.len(),
332 );
333
334 let result = self.inner.delete_many(cids).instrument(span).await;
335
336 let duration_us = start.elapsed().as_micros();
337 match &result {
338 Ok(_) => {
339 debug!(
340 cids.count = cids.len(),
341 duration_us = duration_us,
342 "Batch delete succeeded"
343 );
344 }
345 Err(e) => {
346 error!(
347 cids.count = cids.len(),
348 duration_us = duration_us,
349 error = %e,
350 "Batch delete failed"
351 );
352 }
353 }
354
355 result
356 }
357
358 async fn flush(&self) -> Result<()> {
359 let start = Instant::now();
360
361 let span = info_span!(
362 "blockstore.flush",
363 service.name = %self.service_name,
364 );
365
366 let result = self.inner.flush().instrument(span).await;
367
368 let duration_us = start.elapsed().as_micros();
369 match &result {
370 Ok(_) => {
371 debug!(duration_us = duration_us, "Flush succeeded");
372 }
373 Err(e) => {
374 warn!(
375 duration_us = duration_us,
376 error = %e,
377 "Flush failed"
378 );
379 }
380 }
381
382 result
383 }
384
385 fn list_cids(&self) -> Result<Vec<Cid>> {
386 let start = Instant::now();
387
388 let _span = info_span!(
389 "blockstore.list_cids",
390 service.name = %self.service_name,
391 );
392
393 let result = self.inner.list_cids();
394
395 let duration_us = start.elapsed().as_micros();
396 match &result {
397 Ok(cids) => {
398 debug!(
399 cids.count = cids.len(),
400 duration_us = duration_us,
401 "List CIDs succeeded"
402 );
403 }
404 Err(e) => {
405 error!(
406 duration_us = duration_us,
407 error = %e,
408 "List CIDs failed"
409 );
410 }
411 }
412
413 result
414 }
415
416 fn len(&self) -> usize {
417 self.inner.len()
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::memory::MemoryBlockStore;
425
426 #[tokio::test]
427 async fn test_otel_put_get() {
428 let store = MemoryBlockStore::new();
429 let traced = OtelBlockStore::new(store, "test_node".to_string());
430
431 let block = Block::new(b"hello world".to_vec().into()).unwrap();
432 let cid = block.cid();
433
434 traced.put(&block).await.unwrap();
435 let retrieved = traced.get(cid).await.unwrap();
436 assert!(retrieved.is_some());
437 assert_eq!(retrieved.unwrap().data(), block.data());
438 }
439
440 #[tokio::test]
441 async fn test_otel_has_delete() {
442 let store = MemoryBlockStore::new();
443 let traced = OtelBlockStore::new(store, "test_node".to_string());
444
445 let block = Block::new(b"test data".to_vec().into()).unwrap();
446 let cid = block.cid();
447
448 traced.put(&block).await.unwrap();
449 assert!(traced.has(cid).await.unwrap());
450
451 traced.delete(cid).await.unwrap();
452 assert!(!traced.has(cid).await.unwrap());
453 }
454
455 #[tokio::test]
456 async fn test_otel_batch_operations() {
457 let store = MemoryBlockStore::new();
458 let traced = OtelBlockStore::new(store, "test_node".to_string());
459
460 let blocks = vec![
461 Block::new(b"block1".to_vec().into()).unwrap(),
462 Block::new(b"block2".to_vec().into()).unwrap(),
463 Block::new(b"block3".to_vec().into()).unwrap(),
464 ];
465 let cids: Vec<Cid> = blocks.iter().map(|b| b.cid().clone()).collect();
466
467 traced.put_many(&blocks).await.unwrap();
468
469 let has_results = traced.has_many(&cids).await.unwrap();
470 assert_eq!(has_results.len(), 3);
471 assert!(has_results.iter().all(|&b| b));
472
473 let get_results = traced.get_many(&cids).await.unwrap();
474 assert_eq!(get_results.len(), 3);
475 assert!(get_results.iter().all(|b| b.is_some()));
476 }
477
478 #[tokio::test]
479 async fn test_otel_inner_access() {
480 let store = MemoryBlockStore::new();
481 let traced = OtelBlockStore::new(store, "test_node".to_string());
482
483 let block = Block::new(b"direct access".to_vec().into()).unwrap();
485 traced.inner().put(&block).await.unwrap();
486
487 assert!(traced.has(block.cid()).await.unwrap());
489 }
490}