absurder_sql/storage/
auto_sync.rs

1// Reentrancy-safe lock macros
2#[allow(unused_macros)]
3#[cfg(target_arch = "wasm32")]
4macro_rules! lock_mutex {
5    ($mutex:expr) => {
6        $mutex
7            .try_borrow_mut()
8            .expect("RefCell borrow failed - reentrancy detected in auto_sync.rs")
9    };
10}
11
12#[allow(unused_macros)]
13#[cfg(not(target_arch = "wasm32"))]
14macro_rules! lock_mutex {
15    ($mutex:expr) => {
16        $mutex.lock()
17    };
18}
19
20#[cfg(not(target_arch = "wasm32"))]
21use super::block_storage::SyncRequest;
22use crate::storage::SyncPolicy;
23#[cfg(not(target_arch = "wasm32"))]
24use std::sync::Arc;
25#[cfg(not(target_arch = "wasm32"))]
26use std::sync::atomic::{AtomicBool, Ordering};
27#[cfg(not(target_arch = "wasm32"))]
28use std::time::{Duration, Instant};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::sync::mpsc;
31
32impl super::BlockStorage {
33    //! Background auto-sync functionality
34    //! Handles automatic background synchronization of dirty blocks
35
36    /// Enable automatic background syncing of dirty blocks. Interval in milliseconds.
37    #[cfg(target_arch = "wasm32")]
38    pub fn enable_auto_sync(&self, interval_ms: u64) {
39        *lock_mutex!(self.policy) = Some(SyncPolicy {
40            interval_ms: Some(interval_ms),
41            max_dirty: None,
42            max_dirty_bytes: None,
43            debounce_ms: None,
44            verify_after_write: false,
45        });
46        *lock_mutex!(self.auto_sync_interval) = Some(std::time::Duration::from_millis(interval_ms));
47        log::info!("Auto-sync enabled: every {} ms", interval_ms);
48    }
49
50    #[cfg(not(target_arch = "wasm32"))]
51    pub fn enable_auto_sync(&mut self, interval_ms: u64) {
52        *lock_mutex!(self.policy) = Some(SyncPolicy {
53            interval_ms: Some(interval_ms),
54            max_dirty: None,
55            max_dirty_bytes: None,
56            debounce_ms: None,
57            verify_after_write: false,
58        });
59        log::info!("Auto-sync enabled: every {} ms", interval_ms);
60
61        #[cfg(target_arch = "wasm32")]
62        {
63            // Register event-driven WASM auto-sync
64            // Note: interval_ms is ignored in WASM - we use event-driven approach instead
65            super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
66        }
67
68        #[cfg(not(target_arch = "wasm32"))]
69        {
70            // stop previous workers if any
71            if let Some(stop) = &self.auto_sync_stop {
72                stop.store(true, Ordering::SeqCst);
73            }
74            if let Some(handle) = self.auto_sync_thread.take() {
75                let _ = handle.join();
76            }
77            if let Some(handle) = self.debounce_thread.take() {
78                let _ = handle.join();
79            }
80            if let Some(task) = self.tokio_timer_task.take() {
81                task.abort();
82            }
83            if let Some(task) = self.tokio_debounce_task.take() {
84                task.abort();
85            }
86
87            // Create dedicated sync processor that WILL sync immediately - NO MAYBE BULLSHIT
88            let (sender, mut receiver) = mpsc::unbounded_channel();
89            let dirty_blocks = Arc::clone(self.get_dirty_blocks());
90            let sync_count = self.sync_count.clone();
91            let timer_sync_count = self.timer_sync_count.clone();
92            let debounce_sync_count = self.debounce_sync_count.clone();
93            let last_sync_duration_ms = self.last_sync_duration_ms.clone();
94
95            // Spawn dedicated task that GUARANTEES immediate sync processing
96            tokio::spawn(async move {
97                while let Some(request) = receiver.recv().await {
98                    match request {
99                        SyncRequest::Timer(response_sender) => {
100                            if !lock_mutex!(dirty_blocks).is_empty() {
101                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
102                                let start = std::time::Instant::now();
103                                lock_mutex!(dirty_blocks).clear();
104                                let elapsed = start.elapsed().as_millis() as u64;
105                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
106                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
107                                sync_count.fetch_add(1, Ordering::SeqCst);
108                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
109                            }
110                            // Signal completion - AWAITABLE RESULTS
111                            let _ = response_sender.send(());
112                        }
113                        SyncRequest::Debounce(response_sender) => {
114                            if !lock_mutex!(dirty_blocks).is_empty() {
115                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
116                                let start = std::time::Instant::now();
117                                lock_mutex!(dirty_blocks).clear();
118                                let elapsed = start.elapsed().as_millis() as u64;
119                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
120                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
121                                sync_count.fetch_add(1, Ordering::SeqCst);
122                                debounce_sync_count.fetch_add(1, Ordering::SeqCst);
123                            }
124                            // Signal completion - AWAITABLE RESULTS
125                            let _ = response_sender.send(());
126                        }
127                    }
128                }
129            });
130
131            self.sync_sender = Some(sender);
132            self.sync_receiver = None; // No more "maybe" bullshit
133
134            // Prefer Tokio runtime if present, otherwise fallback to std::thread
135            if tokio::runtime::Handle::try_current().is_ok() {
136                let stop = Arc::new(AtomicBool::new(false));
137                let stop_flag = stop.clone();
138                let dirty = Arc::clone(self.get_dirty_blocks());
139                let sync_sender = self.sync_sender.as_ref().unwrap().clone();
140                let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
141                // first tick happens immediately for interval(0), ensure we wait one period
142                let task = tokio::spawn(async move {
143                    loop {
144                        ticker.tick().await;
145                        if stop_flag.load(Ordering::SeqCst) {
146                            break;
147                        }
148                        // Check if sync is needed
149                        let needs_sync = {
150                            let map = lock_mutex!(dirty);
151                            !map.is_empty()
152                        };
153                        if needs_sync {
154                            log::info!(
155                                "Auto-sync (tokio-interval) requesting sync and AWAITING completion"
156                            );
157                            let (response_sender, response_receiver) =
158                                tokio::sync::oneshot::channel();
159                            if sync_sender
160                                .send(SyncRequest::Timer(response_sender))
161                                .is_err()
162                            {
163                                log::error!("Failed to send timer sync request - channel closed");
164                                break;
165                            } else {
166                                // AWAIT the sync completion - DETERMINISTIC RESULTS
167                                let _ = response_receiver.await;
168                                log::info!("Auto-sync (tokio-interval) sync COMPLETED");
169                            }
170                        } else {
171                            log::debug!(
172                                "Auto-sync (tokio-interval) - no dirty blocks, skipping sync request"
173                            );
174                        }
175                    }
176                });
177                self.auto_sync_stop = Some(stop);
178                self.tokio_timer_task = Some(task);
179                self.auto_sync_thread = None;
180                self.debounce_thread = None;
181            } else {
182                // Fallback to tokio spawn_blocking since we need channel communication
183                let stop = Arc::new(AtomicBool::new(false));
184                let stop_flag = stop.clone();
185                let dirty = Arc::clone(self.get_dirty_blocks());
186                let sync_sender = self.sync_sender.as_ref().unwrap().clone();
187                let interval = Duration::from_millis(interval_ms);
188                let handle = tokio::task::spawn_blocking(move || {
189                    while !stop_flag.load(Ordering::SeqCst) {
190                        std::thread::sleep(interval);
191                        if stop_flag.load(Ordering::SeqCst) {
192                            break;
193                        }
194                        let needs_sync = {
195                            let map = lock_mutex!(dirty);
196                            !map.is_empty()
197                        };
198                        if needs_sync {
199                            log::info!(
200                                "Auto-sync (blocking-thread) requesting sync and AWAITING completion"
201                            );
202                            let (response_sender, response_receiver) =
203                                tokio::sync::oneshot::channel();
204                            if sync_sender
205                                .send(SyncRequest::Timer(response_sender))
206                                .is_err()
207                            {
208                                log::error!("Failed to send timer sync request - channel closed");
209                                break;
210                            } else {
211                                // AWAIT the sync completion - DETERMINISTIC RESULTS
212                                let _ =
213                                    tokio::runtime::Handle::current().block_on(response_receiver);
214                                log::info!("Auto-sync (blocking-thread) sync COMPLETED");
215                            }
216                        }
217                    }
218                });
219                self.auto_sync_stop = Some(stop);
220                self.tokio_timer_task = Some(handle); // Store as tokio task
221                self.auto_sync_thread = None;
222                self.debounce_thread = None;
223            }
224        }
225    }
226
227    /// Enable automatic background syncing using a SyncPolicy
228    #[cfg(target_arch = "wasm32")]
229    pub fn enable_auto_sync_with_policy(&self, policy: SyncPolicy) {
230        *lock_mutex!(self.policy) = Some(policy.clone());
231        *lock_mutex!(self.auto_sync_interval) =
232            policy.interval_ms.map(std::time::Duration::from_millis);
233        log::info!("Auto-sync policy enabled");
234    }
235
236    #[cfg(not(target_arch = "wasm32"))]
237    pub fn enable_auto_sync_with_policy(&mut self, policy: SyncPolicy) {
238        *lock_mutex!(self.policy) = Some(policy.clone());
239        #[cfg(not(target_arch = "wasm32"))]
240        {
241            self.last_auto_sync = Instant::now();
242        }
243        *lock_mutex!(self.auto_sync_interval) = policy.interval_ms.map(Duration::from_millis);
244        log::info!(
245            "Auto-sync policy enabled: interval={:?}, max_dirty={:?}, max_bytes={:?}",
246            policy.interval_ms,
247            policy.max_dirty,
248            policy.max_dirty_bytes
249        );
250
251        #[cfg(target_arch = "wasm32")]
252        {
253            // Register event-driven WASM auto-sync
254            // Interval is ignored - we use event-driven approach (idle callback, visibility change, etc.)
255            super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
256        }
257
258        #[cfg(not(target_arch = "wasm32"))]
259        {
260            // stop previous workers if any
261            if let Some(stop) = &self.auto_sync_stop {
262                stop.store(true, Ordering::SeqCst);
263            }
264            if let Some(handle) = self.auto_sync_thread.take() {
265                let _ = handle.join();
266            }
267            if let Some(handle) = self.debounce_thread.take() {
268                let _ = handle.join();
269            }
270            if let Some(task) = self.tokio_timer_task.take() {
271                task.abort();
272            }
273            if let Some(task) = self.tokio_debounce_task.take() {
274                task.abort();
275            }
276
277            // Create channel for background workers to send sync requests
278            let (sender, mut receiver) = mpsc::unbounded_channel();
279            self.sync_sender = Some(sender);
280            self.sync_receiver = None; // No more "maybe" bullshit
281
282            // Create dedicated sync processor that WILL sync immediately - NO MAYBE BULLSHIT
283            let dirty_blocks = Arc::clone(self.get_dirty_blocks());
284            let sync_count = self.sync_count.clone();
285            let timer_sync_count = self.timer_sync_count.clone();
286            let debounce_sync_count = self.debounce_sync_count.clone();
287            let last_sync_duration_ms = self.last_sync_duration_ms.clone();
288            let threshold_hit = self.threshold_hit.clone();
289
290            // Spawn dedicated task that GUARANTEES immediate sync processing
291            tokio::spawn(async move {
292                while let Some(request) = receiver.recv().await {
293                    match request {
294                        SyncRequest::Timer(response_sender) => {
295                            if !lock_mutex!(dirty_blocks).is_empty() {
296                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
297                                let start = std::time::Instant::now();
298                                lock_mutex!(dirty_blocks).clear();
299                                threshold_hit.store(false, Ordering::SeqCst);
300                                let elapsed = start.elapsed().as_millis() as u64;
301                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
302                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
303                                sync_count.fetch_add(1, Ordering::SeqCst);
304                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
305                            }
306                            // Signal completion - AWAITABLE RESULTS
307                            let _ = response_sender.send(());
308                        }
309                        SyncRequest::Debounce(response_sender) => {
310                            if !lock_mutex!(dirty_blocks).is_empty() {
311                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
312                                let start = std::time::Instant::now();
313                                lock_mutex!(dirty_blocks).clear();
314                                threshold_hit.store(false, Ordering::SeqCst);
315                                let elapsed = start.elapsed().as_millis() as u64;
316                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
317                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
318                                sync_count.fetch_add(1, Ordering::SeqCst);
319                                debounce_sync_count.fetch_add(1, Ordering::SeqCst);
320                            }
321                            // Signal completion - AWAITABLE RESULTS
322                            let _ = response_sender.send(());
323                        }
324                    }
325                }
326            });
327
328            if tokio::runtime::Handle::try_current().is_ok() {
329                // Prefer Tokio tasks
330                if let Some(interval_ms) = policy.interval_ms {
331                    let stop = Arc::new(AtomicBool::new(false));
332                    let stop_flag = stop.clone();
333                    let dirty = Arc::clone(self.get_dirty_blocks());
334                    let sync_sender = self.sync_sender.as_ref().unwrap().clone();
335                    let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
336                    let task = tokio::spawn(async move {
337                        loop {
338                            ticker.tick().await;
339                            if stop_flag.load(Ordering::SeqCst) {
340                                break;
341                            }
342                            // Check if sync is needed
343                            let needs_sync = {
344                                let map = dirty.lock();
345                                !map.is_empty()
346                            };
347                            if needs_sync {
348                                log::info!(
349                                    "Auto-sync (tokio-interval-policy) requesting sync and AWAITING completion"
350                                );
351                                let (response_sender, response_receiver) =
352                                    tokio::sync::oneshot::channel();
353                                if sync_sender
354                                    .send(SyncRequest::Timer(response_sender))
355                                    .is_err()
356                                {
357                                    log::error!(
358                                        "Failed to send timer sync request - channel closed"
359                                    );
360                                    break;
361                                } else {
362                                    // AWAIT the sync completion - DETERMINISTIC RESULTS
363                                    let _ = response_receiver.await;
364                                    log::info!("Auto-sync (tokio-interval-policy) sync COMPLETED");
365                                }
366                            }
367                        }
368                    });
369                    self.auto_sync_stop = Some(stop);
370                    self.tokio_timer_task = Some(task);
371                } else {
372                    self.auto_sync_stop = None;
373                }
374
375                if let Some(debounce_ms) = policy.debounce_ms {
376                    let stop_flag = self
377                        .auto_sync_stop
378                        .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
379                        .clone();
380                    let dirty = Arc::clone(self.get_dirty_blocks());
381                    let last_write = self.last_write_ms.clone();
382                    let threshold_flag = self.threshold_hit.clone();
383                    let sync_sender = self.sync_sender.as_ref().unwrap().clone();
384                    let task = tokio::spawn(async move {
385                        let sleep_step = Duration::from_millis(10);
386                        loop {
387                            if stop_flag.load(Ordering::SeqCst) {
388                                break;
389                            }
390                            if threshold_flag.load(Ordering::SeqCst) {
391                                // Use system clock based last_write; simple polling
392                                let now = super::BlockStorage::now_millis();
393                                let last = last_write.load(Ordering::SeqCst);
394                                let elapsed = now.saturating_sub(last);
395                                if elapsed >= debounce_ms {
396                                    let needs_sync = {
397                                        let map = dirty.lock();
398                                        !map.is_empty()
399                                    };
400                                    if needs_sync {
401                                        log::info!(
402                                            "Auto-sync (tokio-debounce) requesting sync after {}ms idle and AWAITING completion",
403                                            elapsed
404                                        );
405                                        let (response_sender, response_receiver) =
406                                            tokio::sync::oneshot::channel();
407                                        if sync_sender
408                                            .send(SyncRequest::Debounce(response_sender))
409                                            .is_err()
410                                        {
411                                            log::error!(
412                                                "Failed to send debounce sync request - channel closed"
413                                            );
414                                            break;
415                                        } else {
416                                            // AWAIT the sync completion - DETERMINISTIC RESULTS
417                                            let _ = response_receiver.await;
418                                            log::info!("Auto-sync (tokio-debounce) sync COMPLETED");
419                                        }
420                                    }
421                                    threshold_flag.store(false, Ordering::SeqCst);
422                                }
423                            }
424                            tokio::time::sleep(sleep_step).await;
425                        }
426                    });
427                    self.tokio_debounce_task = Some(task);
428                } else {
429                    self.tokio_debounce_task = None;
430                }
431                // Ensure std threads are not used in Tokio mode
432                self.auto_sync_thread = None;
433                self.debounce_thread = None;
434            } else {
435                // Fallback to std::thread implementation (existing)
436                if let Some(interval_ms) = policy.interval_ms {
437                    let stop = Arc::new(AtomicBool::new(false));
438                    let stop_thread = stop.clone();
439                    let dirty = Arc::clone(self.get_dirty_blocks());
440                    let interval = Duration::from_millis(interval_ms);
441                    let threshold_flag = self.threshold_hit.clone();
442                    let sync_count = self.sync_count.clone();
443                    let timer_sync_count = self.timer_sync_count.clone();
444                    let last_sync_duration_ms = self.last_sync_duration_ms.clone();
445                    let handle = std::thread::spawn(move || {
446                        while !stop_thread.load(Ordering::SeqCst) {
447                            std::thread::sleep(interval);
448                            if stop_thread.load(Ordering::SeqCst) {
449                                break;
450                            }
451                            let mut map = dirty.lock();
452                            if !map.is_empty() {
453                                let start = Instant::now();
454                                let count = map.len();
455                                log::info!(
456                                    "Auto-sync (timer-thread) flushing {} dirty blocks",
457                                    count
458                                );
459                                map.clear();
460                                threshold_flag.store(false, Ordering::SeqCst);
461                                let elapsed = start.elapsed();
462                                let ms = elapsed.as_millis() as u64;
463                                let ms = if ms == 0 { 1 } else { ms };
464                                last_sync_duration_ms.store(ms, Ordering::SeqCst);
465                                sync_count.fetch_add(1, Ordering::SeqCst);
466                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
467                            }
468                        }
469                    });
470                    self.auto_sync_stop = Some(stop);
471                    self.auto_sync_thread = Some(handle);
472                } else {
473                    self.auto_sync_stop = None;
474                    self.auto_sync_thread = None;
475                }
476
477                // Debounce worker (std thread)
478                if let Some(debounce_ms) = policy.debounce_ms {
479                    let stop = self
480                        .auto_sync_stop
481                        .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
482                        .clone();
483                    let stop_thread = stop.clone();
484                    let dirty = Arc::clone(self.get_dirty_blocks());
485                    let last_write = self.last_write_ms.clone();
486                    let threshold_flag = self.threshold_hit.clone();
487                    let sync_count = self.sync_count.clone();
488                    let debounce_sync_count = self.debounce_sync_count.clone();
489                    let last_sync_duration_ms = self.last_sync_duration_ms.clone();
490                    let handle = std::thread::spawn(move || {
491                        // Polling loop to detect inactivity window after threshold
492                        let sleep_step = Duration::from_millis(10);
493                        loop {
494                            if stop_thread.load(Ordering::SeqCst) {
495                                break;
496                            }
497                            if threshold_flag.load(Ordering::SeqCst) {
498                                let now = super::BlockStorage::now_millis();
499                                let last = last_write.load(Ordering::SeqCst);
500                                let elapsed = now.saturating_sub(last);
501                                if elapsed >= debounce_ms {
502                                    // Flush
503                                    let mut map = dirty.lock();
504                                    if !map.is_empty() {
505                                        let start = Instant::now();
506                                        let count = map.len();
507                                        log::info!(
508                                            "Auto-sync (debounce-thread) flushing {} dirty blocks after {}ms idle",
509                                            count,
510                                            elapsed
511                                        );
512                                        map.clear();
513                                        let d = start.elapsed();
514                                        let ms = d.as_millis() as u64;
515                                        let ms = if ms == 0 { 1 } else { ms };
516                                        last_sync_duration_ms.store(ms, Ordering::SeqCst);
517                                    }
518                                    threshold_flag.store(false, Ordering::SeqCst);
519                                    sync_count.fetch_add(1, Ordering::SeqCst);
520                                    debounce_sync_count.fetch_add(1, Ordering::SeqCst);
521                                }
522                            }
523                            std::thread::sleep(sleep_step);
524                        }
525                    });
526                    self.debounce_thread = Some(handle);
527                } else {
528                    self.debounce_thread = None;
529                }
530            }
531        }
532    }
533
534    /// Disable automatic background syncing.
535    #[cfg(target_arch = "wasm32")]
536    pub fn disable_auto_sync(&self) {
537        *lock_mutex!(self.policy) = None;
538        *lock_mutex!(self.auto_sync_interval) = None;
539        log::info!("Auto-sync disabled");
540    }
541
542    #[cfg(not(target_arch = "wasm32"))]
543    pub fn disable_auto_sync(&mut self) {
544        *lock_mutex!(self.auto_sync_interval) = None;
545        log::info!("Auto-sync disabled");
546
547        #[cfg(target_arch = "wasm32")]
548        {
549            // Unregister WASM auto-sync
550            super::wasm_auto_sync::unregister_wasm_auto_sync(&self.db_name);
551        }
552
553        #[cfg(not(target_arch = "wasm32"))]
554        {
555            if let Some(stop) = &self.auto_sync_stop {
556                stop.store(true, Ordering::SeqCst);
557            }
558            if let Some(handle) = self.auto_sync_thread.take() {
559                let _ = handle.join();
560            }
561            if let Some(handle) = self.debounce_thread.take() {
562                let _ = handle.join();
563            }
564            if let Some(task) = self.tokio_timer_task.take() {
565                task.abort();
566            }
567            if let Some(task) = self.tokio_debounce_task.take() {
568                task.abort();
569            }
570            self.auto_sync_stop = None;
571        }
572    }
573
574    /// Get the number of completed sync operations (native only metric)
575    #[cfg(not(target_arch = "wasm32"))]
576    pub fn get_sync_count(&self) -> u64 {
577        self.sync_count.load(Ordering::SeqCst)
578    }
579
580    /// Get the number of timer-based background syncs
581    #[cfg(not(target_arch = "wasm32"))]
582    pub fn get_timer_sync_count(&self) -> u64 {
583        self.timer_sync_count.load(Ordering::SeqCst)
584    }
585
586    /// Get the number of debounce-based background syncs
587    #[cfg(not(target_arch = "wasm32"))]
588    pub fn get_debounce_sync_count(&self) -> u64 {
589        self.debounce_sync_count.load(Ordering::SeqCst)
590    }
591
592    /// Get the duration in ms of the last sync operation (>=1 when a sync occurs)
593    #[cfg(not(target_arch = "wasm32"))]
594    pub fn get_last_sync_duration_ms(&self) -> u64 {
595        self.last_sync_duration_ms.load(Ordering::SeqCst)
596    }
597
598    #[cfg(target_arch = "wasm32")]
599    pub(super) fn maybe_auto_sync(&self) {
600        // Check if we should trigger threshold-based sync
601        if let Some(policy) = lock_mutex!(self.policy).clone() {
602            let dirty_count = self.get_dirty_count();
603            let dirty_bytes = dirty_count * super::BLOCK_SIZE;
604
605            // Check max_dirty threshold
606            if let Some(max_dirty) = policy.max_dirty {
607                if dirty_count >= max_dirty {
608                    log::info!(
609                        "WASM threshold sync triggered: {} dirty blocks >= {}",
610                        dirty_count,
611                        max_dirty
612                    );
613                    // Spawn async sync
614                    let db_name = self.db_name.clone();
615                    wasm_bindgen_futures::spawn_local(async move {
616                        if let Ok(storage) = super::BlockStorage::new(&db_name).await {
617                            if let Err(e) = storage.sync().await {
618                                log::error!("WASM threshold sync failed: {}", e.message);
619                            }
620                        }
621                    });
622                    return;
623                }
624            }
625
626            // Check max_dirty_bytes threshold
627            if let Some(max_bytes) = policy.max_dirty_bytes {
628                if dirty_bytes >= max_bytes {
629                    log::info!(
630                        "WASM threshold sync triggered: {} dirty bytes >= {}",
631                        dirty_bytes,
632                        max_bytes
633                    );
634                    // Spawn async sync
635                    let db_name = self.db_name.clone();
636                    wasm_bindgen_futures::spawn_local(async move {
637                        if let Ok(storage) = super::BlockStorage::new(&db_name).await {
638                            if let Err(e) = storage.sync().await {
639                                log::error!("WASM threshold sync failed: {}", e.message);
640                            }
641                        }
642                    });
643                }
644            }
645        }
646    }
647
648    #[cfg(not(target_arch = "wasm32"))]
649    pub(super) fn maybe_auto_sync(&self) {
650        // Background sync is now handled by dedicated processor - NO MORE MAYBE
651        // This function is now a no-op since sync happens IMMEDIATELY
652    }
653}