ipfrs_storage/
coalesce.rs

1//! Write coalescing for batching similar writes
2//!
3//! Combines multiple write operations into batches to improve performance:
4//! - Time-based batching (flush after interval)
5//! - Size-based batching (flush when batch size reached)
6//! - Automatic flushing on shutdown
7//! - Configurable batch sizes and intervals
8//!
9//! ## Example
10//! ```no_run
11//! use ipfrs_storage::{CoalescingBlockStore, CoalesceConfig, MemoryBlockStore};
12//! use std::time::Duration;
13//!
14//! #[tokio::main]
15//! async fn main() {
16//!     let store = MemoryBlockStore::new();
17//!     let config = CoalesceConfig::new(100, Duration::from_millis(100));
18//!     let coalescing_store = CoalescingBlockStore::new(store, config);
19//!
20//!     // Writes are automatically batched
21//! }
22//! ```
23
24use crate::traits::BlockStore;
25use async_trait::async_trait;
26use ipfrs_core::{Block, Cid, Result as IpfsResult};
27use parking_lot::Mutex;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::time::sleep;
33
34/// Write coalescing configuration
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CoalesceConfig {
37    /// Maximum batch size before auto-flush
38    pub max_batch_size: usize,
39    /// Maximum time to wait before auto-flush
40    pub max_batch_time: Duration,
41    /// Enable automatic background flushing
42    pub auto_flush: bool,
43}
44
45impl CoalesceConfig {
46    /// Create a new coalescing configuration
47    pub fn new(max_batch_size: usize, max_batch_time: Duration) -> Self {
48        Self {
49            max_batch_size,
50            max_batch_time,
51            auto_flush: true,
52        }
53    }
54
55    /// Disable automatic background flushing
56    pub fn without_auto_flush(mut self) -> Self {
57        self.auto_flush = false;
58        self
59    }
60}
61
62impl Default for CoalesceConfig {
63    fn default() -> Self {
64        Self::new(100, Duration::from_millis(100))
65    }
66}
67
68/// Pending write operation
69#[derive(Debug, Clone)]
70struct PendingWrite {
71    block: Block,
72    #[allow(dead_code)]
73    added_at: Instant,
74}
75
76/// Internal state for write coalescing
77#[derive(Debug)]
78struct CoalescingState {
79    /// Pending writes by CID
80    pending: HashMap<Cid, PendingWrite>,
81    /// When the oldest pending write was added
82    oldest_write: Option<Instant>,
83    /// Total writes coalesced
84    total_writes: u64,
85    /// Total flushes performed
86    total_flushes: u64,
87    /// Total blocks written
88    total_blocks: u64,
89}
90
91/// Coalescing statistics
92#[derive(Debug, Clone, Default, Serialize, Deserialize)]
93pub struct CoalesceStats {
94    /// Total write operations received
95    pub total_writes: u64,
96    /// Total flush operations
97    pub total_flushes: u64,
98    /// Total blocks actually written
99    pub total_blocks: u64,
100    /// Current pending writes
101    pub pending_writes: usize,
102    /// Coalescing ratio (writes per flush)
103    pub coalescing_ratio: f64,
104}
105
106/// Block store with write coalescing
107pub struct CoalescingBlockStore<S: BlockStore> {
108    inner: S,
109    config: CoalesceConfig,
110    state: Arc<Mutex<CoalescingState>>,
111}
112
113impl<S: BlockStore + Clone> CoalescingBlockStore<S> {
114    /// Create a new coalescing block store
115    pub fn new(inner: S, config: CoalesceConfig) -> Self
116    where
117        S: 'static,
118    {
119        let store = Self {
120            inner: inner.clone(),
121            config,
122            state: Arc::new(Mutex::new(CoalescingState {
123                pending: HashMap::new(),
124                oldest_write: None,
125                total_writes: 0,
126                total_flushes: 0,
127                total_blocks: 0,
128            })),
129        };
130
131        // Start background flush task if auto-flush is enabled
132        if store.config.auto_flush {
133            let state = Arc::clone(&store.state);
134            let config = store.config.clone();
135
136            tokio::spawn(async move {
137                loop {
138                    sleep(config.max_batch_time / 2).await;
139
140                    let should_flush = {
141                        let state = state.lock();
142                        if let Some(oldest) = state.oldest_write {
143                            oldest.elapsed() >= config.max_batch_time
144                        } else {
145                            false
146                        }
147                    };
148
149                    if should_flush {
150                        let _ = Self::flush_pending(&inner, &state).await;
151                    }
152                }
153            });
154        }
155
156        store
157    }
158
159    /// Get coalescing statistics
160    pub fn stats(&self) -> CoalesceStats {
161        let state = self.state.lock();
162
163        CoalesceStats {
164            total_writes: state.total_writes,
165            total_flushes: state.total_flushes,
166            total_blocks: state.total_blocks,
167            pending_writes: state.pending.len(),
168            coalescing_ratio: if state.total_flushes > 0 {
169                state.total_writes as f64 / state.total_flushes as f64
170            } else {
171                0.0
172            },
173        }
174    }
175
176    /// Manually flush pending writes
177    pub async fn flush_writes(&self) -> IpfsResult<usize> {
178        Self::flush_pending(&self.inner, &self.state).await
179    }
180
181    /// Internal flush implementation
182    async fn flush_pending(inner: &S, state: &Arc<Mutex<CoalescingState>>) -> IpfsResult<usize> {
183        let blocks_to_write = {
184            let mut state = state.lock();
185            if state.pending.is_empty() {
186                return Ok(0);
187            }
188
189            let blocks: Vec<_> = state.pending.values().map(|pw| pw.block.clone()).collect();
190
191            let count = blocks.len();
192            state.pending.clear();
193            state.oldest_write = None;
194            state.total_flushes += 1;
195            state.total_blocks += count as u64;
196
197            blocks
198        };
199
200        let count = blocks_to_write.len();
201
202        // Write blocks
203        inner.put_many(&blocks_to_write).await?;
204
205        Ok(count)
206    }
207}
208
209#[async_trait]
210impl<S: BlockStore + Clone> BlockStore for CoalescingBlockStore<S> {
211    async fn get(&self, cid: &Cid) -> IpfsResult<Option<Block>> {
212        // Check pending writes first
213        {
214            let state = self.state.lock();
215            if let Some(pending) = state.pending.get(cid) {
216                return Ok(Some(pending.block.clone()));
217            }
218        }
219
220        self.inner.get(cid).await
221    }
222
223    async fn put(&self, block: &Block) -> IpfsResult<()> {
224        let should_flush = {
225            let mut state = self.state.lock();
226            state.total_writes += 1;
227
228            let pending_write = PendingWrite {
229                block: block.clone(),
230                added_at: Instant::now(),
231            };
232
233            if state.oldest_write.is_none() {
234                state.oldest_write = Some(Instant::now());
235            }
236
237            state.pending.insert(*block.cid(), pending_write);
238
239            state.pending.len() >= self.config.max_batch_size
240        };
241
242        if should_flush {
243            Self::flush_pending(&self.inner, &self.state).await?;
244        }
245
246        Ok(())
247    }
248
249    async fn has(&self, cid: &Cid) -> IpfsResult<bool> {
250        // Check pending writes
251        {
252            let state = self.state.lock();
253            if state.pending.contains_key(cid) {
254                return Ok(true);
255            }
256        }
257
258        self.inner.has(cid).await
259    }
260
261    async fn delete(&self, cid: &Cid) -> IpfsResult<()> {
262        // Remove from pending if present
263        {
264            let mut state = self.state.lock();
265            state.pending.remove(cid);
266            if state.pending.is_empty() {
267                state.oldest_write = None;
268            }
269        }
270
271        self.inner.delete(cid).await
272    }
273
274    fn list_cids(&self) -> IpfsResult<Vec<Cid>> {
275        let mut cids = self.inner.list_cids()?;
276
277        // Add pending writes
278        {
279            let state = self.state.lock();
280            cids.extend(state.pending.keys().copied());
281        }
282
283        cids.sort();
284        cids.dedup();
285        Ok(cids)
286    }
287
288    fn len(&self) -> usize {
289        let pending_count = self.state.lock().pending.len();
290        self.inner.len() + pending_count
291    }
292
293    async fn flush(&self) -> IpfsResult<()> {
294        // Flush pending writes first
295        Self::flush_pending(&self.inner, &self.state).await?;
296        self.inner.flush().await
297    }
298
299    async fn put_many(&self, blocks: &[Block]) -> IpfsResult<()> {
300        // Add to pending batch
301        {
302            let mut state = self.state.lock();
303            let now = Instant::now();
304
305            if state.oldest_write.is_none() {
306                state.oldest_write = Some(now);
307            }
308
309            for block in blocks {
310                state.total_writes += 1;
311                state.pending.insert(
312                    *block.cid(),
313                    PendingWrite {
314                        block: block.clone(),
315                        added_at: now,
316                    },
317                );
318            }
319        }
320
321        // Flush if batch is large enough
322        let should_flush = {
323            let state = self.state.lock();
324            state.pending.len() >= self.config.max_batch_size
325        };
326
327        if should_flush {
328            Self::flush_pending(&self.inner, &self.state).await?;
329        }
330
331        Ok(())
332    }
333
334    async fn get_many(&self, cids: &[Cid]) -> IpfsResult<Vec<Option<Block>>> {
335        let mut results = Vec::with_capacity(cids.len());
336        let mut missing_cids = Vec::new();
337
338        // Check pending first
339        {
340            let state = self.state.lock();
341            for cid in cids {
342                if let Some(pending) = state.pending.get(cid) {
343                    results.push(Some(pending.block.clone()));
344                } else {
345                    results.push(None);
346                    missing_cids.push(*cid);
347                }
348            }
349        }
350
351        // Get missing from inner store
352        if !missing_cids.is_empty() {
353            let inner_results = self.inner.get_many(&missing_cids).await?;
354            let mut inner_idx = 0;
355
356            for result in &mut results {
357                if result.is_none() {
358                    *result = inner_results[inner_idx].clone();
359                    inner_idx += 1;
360                }
361            }
362        }
363
364        Ok(results)
365    }
366
367    async fn has_many(&self, cids: &[Cid]) -> IpfsResult<Vec<bool>> {
368        let mut results = Vec::with_capacity(cids.len());
369        let mut missing_cids = Vec::new();
370
371        // Check pending first
372        {
373            let state = self.state.lock();
374            for cid in cids {
375                if state.pending.contains_key(cid) {
376                    results.push(true);
377                } else {
378                    results.push(false);
379                    missing_cids.push(*cid);
380                }
381            }
382        }
383
384        // Check missing in inner store
385        if !missing_cids.is_empty() {
386            let inner_results = self.inner.has_many(&missing_cids).await?;
387            let mut inner_idx = 0;
388
389            for result in &mut results {
390                if !*result {
391                    *result = inner_results[inner_idx];
392                    inner_idx += 1;
393                }
394            }
395        }
396
397        Ok(results)
398    }
399
400    async fn delete_many(&self, cids: &[Cid]) -> IpfsResult<()> {
401        // Remove from pending
402        {
403            let mut state = self.state.lock();
404            for cid in cids {
405                state.pending.remove(cid);
406            }
407            if state.pending.is_empty() {
408                state.oldest_write = None;
409            }
410        }
411
412        self.inner.delete_many(cids).await
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::memory::MemoryBlockStore;
420    use crate::utils::create_block;
421
422    #[tokio::test]
423    async fn test_coalescing_basic() {
424        let store = MemoryBlockStore::new();
425        let config = CoalesceConfig::new(3, Duration::from_secs(10)).without_auto_flush();
426        let coalescing = CoalescingBlockStore::new(store, config);
427
428        // Add 2 blocks (below threshold)
429        let block1 = create_block(b"data1".to_vec()).unwrap();
430        let block2 = create_block(b"data2".to_vec()).unwrap();
431
432        coalescing.put(&block1).await.unwrap();
433        coalescing.put(&block2).await.unwrap();
434
435        let stats = coalescing.stats();
436        assert_eq!(stats.total_writes, 2);
437        assert_eq!(stats.total_flushes, 0);
438        assert_eq!(stats.pending_writes, 2);
439    }
440
441    #[tokio::test]
442    async fn test_coalescing_auto_flush() {
443        let store = MemoryBlockStore::new();
444        let config = CoalesceConfig::new(2, Duration::from_secs(10)).without_auto_flush();
445        let coalescing = CoalescingBlockStore::new(store, config);
446
447        // Add blocks up to threshold
448        let block1 = create_block(b"data1".to_vec()).unwrap();
449        let block2 = create_block(b"data2".to_vec()).unwrap();
450
451        coalescing.put(&block1).await.unwrap();
452        coalescing.put(&block2).await.unwrap();
453
454        // Should have flushed automatically
455        let stats = coalescing.stats();
456        assert_eq!(stats.total_writes, 2);
457        assert_eq!(stats.total_flushes, 1);
458        assert_eq!(stats.pending_writes, 0);
459    }
460
461    #[tokio::test]
462    async fn test_coalescing_manual_flush() {
463        let store = MemoryBlockStore::new();
464        let config = CoalesceConfig::new(100, Duration::from_secs(10)).without_auto_flush();
465        let coalescing = CoalescingBlockStore::new(store, config);
466
467        // Add some blocks
468        for i in 0..5 {
469            let block = create_block(vec![i; 10]).unwrap();
470            coalescing.put(&block).await.unwrap();
471        }
472
473        assert_eq!(coalescing.stats().pending_writes, 5);
474
475        // Manual flush
476        let flushed = coalescing.flush_writes().await.unwrap();
477        assert_eq!(flushed, 5);
478        assert_eq!(coalescing.stats().pending_writes, 0);
479    }
480
481    #[tokio::test]
482    async fn test_coalescing_read_pending() {
483        let store = MemoryBlockStore::new();
484        let config = CoalesceConfig::new(100, Duration::from_secs(10)).without_auto_flush();
485        let coalescing = CoalescingBlockStore::new(store, config);
486
487        let block = create_block(b"test data".to_vec()).unwrap();
488        let cid = *block.cid();
489
490        // Write but don't flush
491        coalescing.put(&block).await.unwrap();
492
493        // Should be able to read from pending
494        assert!(coalescing.has(&cid).await.unwrap());
495        let retrieved = coalescing.get(&cid).await.unwrap();
496        assert!(retrieved.is_some());
497        assert_eq!(retrieved.unwrap().data(), block.data());
498    }
499
500    #[tokio::test]
501    async fn test_coalescing_stats() {
502        let store = MemoryBlockStore::new();
503        let config = CoalesceConfig::new(3, Duration::from_secs(10)).without_auto_flush();
504        let coalescing = CoalescingBlockStore::new(store, config);
505
506        // Add blocks
507        for i in 0..6 {
508            let block = create_block(vec![i; 10]).unwrap();
509            coalescing.put(&block).await.unwrap();
510        }
511
512        let stats = coalescing.stats();
513        assert_eq!(stats.total_writes, 6);
514        assert_eq!(stats.total_flushes, 2); // Two auto-flushes at threshold
515        assert!(stats.coalescing_ratio > 0.0);
516    }
517}