1use crate::traits::BlockStore;
25use async_trait::async_trait;
26use ipfrs_core::{Block, Cid, Result as IpfsResult};
27use parking_lot::Mutex;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::time::sleep;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct CoalesceConfig {
37 pub max_batch_size: usize,
39 pub max_batch_time: Duration,
41 pub auto_flush: bool,
43}
44
45impl CoalesceConfig {
46 pub fn new(max_batch_size: usize, max_batch_time: Duration) -> Self {
48 Self {
49 max_batch_size,
50 max_batch_time,
51 auto_flush: true,
52 }
53 }
54
55 pub fn without_auto_flush(mut self) -> Self {
57 self.auto_flush = false;
58 self
59 }
60}
61
62impl Default for CoalesceConfig {
63 fn default() -> Self {
64 Self::new(100, Duration::from_millis(100))
65 }
66}
67
68#[derive(Debug, Clone)]
70struct PendingWrite {
71 block: Block,
72 #[allow(dead_code)]
73 added_at: Instant,
74}
75
76#[derive(Debug)]
78struct CoalescingState {
79 pending: HashMap<Cid, PendingWrite>,
81 oldest_write: Option<Instant>,
83 total_writes: u64,
85 total_flushes: u64,
87 total_blocks: u64,
89}
90
91#[derive(Debug, Clone, Default, Serialize, Deserialize)]
93pub struct CoalesceStats {
94 pub total_writes: u64,
96 pub total_flushes: u64,
98 pub total_blocks: u64,
100 pub pending_writes: usize,
102 pub coalescing_ratio: f64,
104}
105
106pub struct CoalescingBlockStore<S: BlockStore> {
108 inner: S,
109 config: CoalesceConfig,
110 state: Arc<Mutex<CoalescingState>>,
111}
112
113impl<S: BlockStore + Clone> CoalescingBlockStore<S> {
114 pub fn new(inner: S, config: CoalesceConfig) -> Self
116 where
117 S: 'static,
118 {
119 let store = Self {
120 inner: inner.clone(),
121 config,
122 state: Arc::new(Mutex::new(CoalescingState {
123 pending: HashMap::new(),
124 oldest_write: None,
125 total_writes: 0,
126 total_flushes: 0,
127 total_blocks: 0,
128 })),
129 };
130
131 if store.config.auto_flush {
133 let state = Arc::clone(&store.state);
134 let config = store.config.clone();
135
136 tokio::spawn(async move {
137 loop {
138 sleep(config.max_batch_time / 2).await;
139
140 let should_flush = {
141 let state = state.lock();
142 if let Some(oldest) = state.oldest_write {
143 oldest.elapsed() >= config.max_batch_time
144 } else {
145 false
146 }
147 };
148
149 if should_flush {
150 let _ = Self::flush_pending(&inner, &state).await;
151 }
152 }
153 });
154 }
155
156 store
157 }
158
159 pub fn stats(&self) -> CoalesceStats {
161 let state = self.state.lock();
162
163 CoalesceStats {
164 total_writes: state.total_writes,
165 total_flushes: state.total_flushes,
166 total_blocks: state.total_blocks,
167 pending_writes: state.pending.len(),
168 coalescing_ratio: if state.total_flushes > 0 {
169 state.total_writes as f64 / state.total_flushes as f64
170 } else {
171 0.0
172 },
173 }
174 }
175
176 pub async fn flush_writes(&self) -> IpfsResult<usize> {
178 Self::flush_pending(&self.inner, &self.state).await
179 }
180
181 async fn flush_pending(inner: &S, state: &Arc<Mutex<CoalescingState>>) -> IpfsResult<usize> {
183 let blocks_to_write = {
184 let mut state = state.lock();
185 if state.pending.is_empty() {
186 return Ok(0);
187 }
188
189 let blocks: Vec<_> = state.pending.values().map(|pw| pw.block.clone()).collect();
190
191 let count = blocks.len();
192 state.pending.clear();
193 state.oldest_write = None;
194 state.total_flushes += 1;
195 state.total_blocks += count as u64;
196
197 blocks
198 };
199
200 let count = blocks_to_write.len();
201
202 inner.put_many(&blocks_to_write).await?;
204
205 Ok(count)
206 }
207}
208
209#[async_trait]
210impl<S: BlockStore + Clone> BlockStore for CoalescingBlockStore<S> {
211 async fn get(&self, cid: &Cid) -> IpfsResult<Option<Block>> {
212 {
214 let state = self.state.lock();
215 if let Some(pending) = state.pending.get(cid) {
216 return Ok(Some(pending.block.clone()));
217 }
218 }
219
220 self.inner.get(cid).await
221 }
222
223 async fn put(&self, block: &Block) -> IpfsResult<()> {
224 let should_flush = {
225 let mut state = self.state.lock();
226 state.total_writes += 1;
227
228 let pending_write = PendingWrite {
229 block: block.clone(),
230 added_at: Instant::now(),
231 };
232
233 if state.oldest_write.is_none() {
234 state.oldest_write = Some(Instant::now());
235 }
236
237 state.pending.insert(*block.cid(), pending_write);
238
239 state.pending.len() >= self.config.max_batch_size
240 };
241
242 if should_flush {
243 Self::flush_pending(&self.inner, &self.state).await?;
244 }
245
246 Ok(())
247 }
248
249 async fn has(&self, cid: &Cid) -> IpfsResult<bool> {
250 {
252 let state = self.state.lock();
253 if state.pending.contains_key(cid) {
254 return Ok(true);
255 }
256 }
257
258 self.inner.has(cid).await
259 }
260
261 async fn delete(&self, cid: &Cid) -> IpfsResult<()> {
262 {
264 let mut state = self.state.lock();
265 state.pending.remove(cid);
266 if state.pending.is_empty() {
267 state.oldest_write = None;
268 }
269 }
270
271 self.inner.delete(cid).await
272 }
273
274 fn list_cids(&self) -> IpfsResult<Vec<Cid>> {
275 let mut cids = self.inner.list_cids()?;
276
277 {
279 let state = self.state.lock();
280 cids.extend(state.pending.keys().copied());
281 }
282
283 cids.sort();
284 cids.dedup();
285 Ok(cids)
286 }
287
288 fn len(&self) -> usize {
289 let pending_count = self.state.lock().pending.len();
290 self.inner.len() + pending_count
291 }
292
293 async fn flush(&self) -> IpfsResult<()> {
294 Self::flush_pending(&self.inner, &self.state).await?;
296 self.inner.flush().await
297 }
298
299 async fn put_many(&self, blocks: &[Block]) -> IpfsResult<()> {
300 {
302 let mut state = self.state.lock();
303 let now = Instant::now();
304
305 if state.oldest_write.is_none() {
306 state.oldest_write = Some(now);
307 }
308
309 for block in blocks {
310 state.total_writes += 1;
311 state.pending.insert(
312 *block.cid(),
313 PendingWrite {
314 block: block.clone(),
315 added_at: now,
316 },
317 );
318 }
319 }
320
321 let should_flush = {
323 let state = self.state.lock();
324 state.pending.len() >= self.config.max_batch_size
325 };
326
327 if should_flush {
328 Self::flush_pending(&self.inner, &self.state).await?;
329 }
330
331 Ok(())
332 }
333
334 async fn get_many(&self, cids: &[Cid]) -> IpfsResult<Vec<Option<Block>>> {
335 let mut results = Vec::with_capacity(cids.len());
336 let mut missing_cids = Vec::new();
337
338 {
340 let state = self.state.lock();
341 for cid in cids {
342 if let Some(pending) = state.pending.get(cid) {
343 results.push(Some(pending.block.clone()));
344 } else {
345 results.push(None);
346 missing_cids.push(*cid);
347 }
348 }
349 }
350
351 if !missing_cids.is_empty() {
353 let inner_results = self.inner.get_many(&missing_cids).await?;
354 let mut inner_idx = 0;
355
356 for result in &mut results {
357 if result.is_none() {
358 *result = inner_results[inner_idx].clone();
359 inner_idx += 1;
360 }
361 }
362 }
363
364 Ok(results)
365 }
366
367 async fn has_many(&self, cids: &[Cid]) -> IpfsResult<Vec<bool>> {
368 let mut results = Vec::with_capacity(cids.len());
369 let mut missing_cids = Vec::new();
370
371 {
373 let state = self.state.lock();
374 for cid in cids {
375 if state.pending.contains_key(cid) {
376 results.push(true);
377 } else {
378 results.push(false);
379 missing_cids.push(*cid);
380 }
381 }
382 }
383
384 if !missing_cids.is_empty() {
386 let inner_results = self.inner.has_many(&missing_cids).await?;
387 let mut inner_idx = 0;
388
389 for result in &mut results {
390 if !*result {
391 *result = inner_results[inner_idx];
392 inner_idx += 1;
393 }
394 }
395 }
396
397 Ok(results)
398 }
399
400 async fn delete_many(&self, cids: &[Cid]) -> IpfsResult<()> {
401 {
403 let mut state = self.state.lock();
404 for cid in cids {
405 state.pending.remove(cid);
406 }
407 if state.pending.is_empty() {
408 state.oldest_write = None;
409 }
410 }
411
412 self.inner.delete_many(cids).await
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::memory::MemoryBlockStore;
420 use crate::utils::create_block;
421
422 #[tokio::test]
423 async fn test_coalescing_basic() {
424 let store = MemoryBlockStore::new();
425 let config = CoalesceConfig::new(3, Duration::from_secs(10)).without_auto_flush();
426 let coalescing = CoalescingBlockStore::new(store, config);
427
428 let block1 = create_block(b"data1".to_vec()).unwrap();
430 let block2 = create_block(b"data2".to_vec()).unwrap();
431
432 coalescing.put(&block1).await.unwrap();
433 coalescing.put(&block2).await.unwrap();
434
435 let stats = coalescing.stats();
436 assert_eq!(stats.total_writes, 2);
437 assert_eq!(stats.total_flushes, 0);
438 assert_eq!(stats.pending_writes, 2);
439 }
440
441 #[tokio::test]
442 async fn test_coalescing_auto_flush() {
443 let store = MemoryBlockStore::new();
444 let config = CoalesceConfig::new(2, Duration::from_secs(10)).without_auto_flush();
445 let coalescing = CoalescingBlockStore::new(store, config);
446
447 let block1 = create_block(b"data1".to_vec()).unwrap();
449 let block2 = create_block(b"data2".to_vec()).unwrap();
450
451 coalescing.put(&block1).await.unwrap();
452 coalescing.put(&block2).await.unwrap();
453
454 let stats = coalescing.stats();
456 assert_eq!(stats.total_writes, 2);
457 assert_eq!(stats.total_flushes, 1);
458 assert_eq!(stats.pending_writes, 0);
459 }
460
461 #[tokio::test]
462 async fn test_coalescing_manual_flush() {
463 let store = MemoryBlockStore::new();
464 let config = CoalesceConfig::new(100, Duration::from_secs(10)).without_auto_flush();
465 let coalescing = CoalescingBlockStore::new(store, config);
466
467 for i in 0..5 {
469 let block = create_block(vec![i; 10]).unwrap();
470 coalescing.put(&block).await.unwrap();
471 }
472
473 assert_eq!(coalescing.stats().pending_writes, 5);
474
475 let flushed = coalescing.flush_writes().await.unwrap();
477 assert_eq!(flushed, 5);
478 assert_eq!(coalescing.stats().pending_writes, 0);
479 }
480
481 #[tokio::test]
482 async fn test_coalescing_read_pending() {
483 let store = MemoryBlockStore::new();
484 let config = CoalesceConfig::new(100, Duration::from_secs(10)).without_auto_flush();
485 let coalescing = CoalescingBlockStore::new(store, config);
486
487 let block = create_block(b"test data".to_vec()).unwrap();
488 let cid = *block.cid();
489
490 coalescing.put(&block).await.unwrap();
492
493 assert!(coalescing.has(&cid).await.unwrap());
495 let retrieved = coalescing.get(&cid).await.unwrap();
496 assert!(retrieved.is_some());
497 assert_eq!(retrieved.unwrap().data(), block.data());
498 }
499
500 #[tokio::test]
501 async fn test_coalescing_stats() {
502 let store = MemoryBlockStore::new();
503 let config = CoalesceConfig::new(3, Duration::from_secs(10)).without_auto_flush();
504 let coalescing = CoalescingBlockStore::new(store, config);
505
506 for i in 0..6 {
508 let block = create_block(vec![i; 10]).unwrap();
509 coalescing.put(&block).await.unwrap();
510 }
511
512 let stats = coalescing.stats();
513 assert_eq!(stats.total_writes, 6);
514 assert_eq!(stats.total_flushes, 2); assert!(stats.coalescing_ratio > 0.0);
516 }
517}