absurder_sql/storage/
auto_sync.rs

1#[cfg(not(target_arch = "wasm32"))]
2use std::sync::atomic::{AtomicBool, Ordering};
3#[cfg(not(target_arch = "wasm32"))]
4use std::sync::Arc;
5use std::time::Duration;
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(not(target_arch = "wasm32"))]
9use tokio::sync::mpsc;
10use crate::storage::SyncPolicy;
11#[cfg(not(target_arch = "wasm32"))]
12use super::block_storage::SyncRequest;
13
14impl super::BlockStorage {
15    /// Enable automatic background syncing of dirty blocks. Interval in milliseconds.
16    pub fn enable_auto_sync(&mut self, interval_ms: u64) {
17        self.auto_sync_interval = Some(Duration::from_millis(interval_ms));
18        #[cfg(not(target_arch = "wasm32"))]
19        {
20            self.last_auto_sync = Instant::now();
21        }
22        self.policy = Some(SyncPolicy { interval_ms: Some(interval_ms), max_dirty: None, max_dirty_bytes: None, debounce_ms: None, verify_after_write: false });
23        log::info!("Auto-sync enabled: every {} ms", interval_ms);
24        
25        #[cfg(target_arch = "wasm32")]
26        {
27            // Register event-driven WASM auto-sync
28            // Note: interval_ms is ignored in WASM - we use event-driven approach instead
29            super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
30        }
31        
32        #[cfg(not(target_arch = "wasm32"))]
33        {
34            // stop previous workers if any
35            if let Some(stop) = &self.auto_sync_stop { stop.store(true, Ordering::SeqCst); }
36            if let Some(handle) = self.auto_sync_thread.take() { let _ = handle.join(); }
37            if let Some(handle) = self.debounce_thread.take() { let _ = handle.join(); }
38            if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
39            if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
40
41            // Create dedicated sync processor that WILL sync immediately - NO MAYBE BULLSHIT
42            let (sender, mut receiver) = mpsc::unbounded_channel();
43            let dirty_blocks = Arc::clone(self.get_dirty_blocks());
44            let sync_count = self.sync_count.clone();
45            let timer_sync_count = self.timer_sync_count.clone();
46            let debounce_sync_count = self.debounce_sync_count.clone();
47            let last_sync_duration_ms = self.last_sync_duration_ms.clone();
48
49            // Spawn dedicated task that GUARANTEES immediate sync processing
50            tokio::spawn(async move {
51                while let Some(request) = receiver.recv().await {
52                    match request {
53                        SyncRequest::Timer(response_sender) => {
54                            if !dirty_blocks.lock().is_empty() {
55                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
56                                let start = std::time::Instant::now();
57                                dirty_blocks.lock().clear();
58                                let elapsed = start.elapsed().as_millis() as u64;
59                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
60                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
61                                sync_count.fetch_add(1, Ordering::SeqCst);
62                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
63                            }
64                            // Signal completion - AWAITABLE RESULTS
65                            let _ = response_sender.send(());
66                        },
67                        SyncRequest::Debounce(response_sender) => {
68                            if !dirty_blocks.lock().is_empty() {
69                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
70                                let start = std::time::Instant::now();
71                                dirty_blocks.lock().clear();
72                                let elapsed = start.elapsed().as_millis() as u64;
73                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
74                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
75                                sync_count.fetch_add(1, Ordering::SeqCst);
76                                debounce_sync_count.fetch_add(1, Ordering::SeqCst);
77                            }
78                            // Signal completion - AWAITABLE RESULTS
79                            let _ = response_sender.send(());
80                        },
81                    }
82                }
83            });
84
85            self.sync_sender = Some(sender);
86            self.sync_receiver = None; // No more "maybe" bullshit
87
88            // Prefer Tokio runtime if present, otherwise fallback to std::thread
89            if tokio::runtime::Handle::try_current().is_ok() {
90                let stop = Arc::new(AtomicBool::new(false));
91                let stop_flag = stop.clone();
92                let dirty = Arc::clone(self.get_dirty_blocks());
93                let sync_sender = self.sync_sender.as_ref().unwrap().clone();
94                let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
95                // first tick happens immediately for interval(0), ensure we wait one period
96                let task = tokio::spawn(async move {
97                    loop {
98                        ticker.tick().await;
99                        if stop_flag.load(Ordering::SeqCst) { break; }
100                        // Check if sync is needed
101                        let needs_sync = {
102                            let map = dirty.lock();
103                            !map.is_empty()
104                        };
105                        if needs_sync {
106                            log::info!("Auto-sync (tokio-interval) requesting sync and AWAITING completion");
107                            let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
108                            if let Err(_) = sync_sender.send(SyncRequest::Timer(response_sender)) {
109                                log::error!("Failed to send timer sync request - channel closed");
110                                break;
111                            } else {
112                                // AWAIT the sync completion - DETERMINISTIC RESULTS
113                                let _ = response_receiver.await;
114                                log::info!("Auto-sync (tokio-interval) sync COMPLETED");
115                            }
116                        } else {
117                            log::debug!("Auto-sync (tokio-interval) - no dirty blocks, skipping sync request");
118                        }
119                    }
120                });
121                self.auto_sync_stop = Some(stop);
122                self.tokio_timer_task = Some(task);
123                self.auto_sync_thread = None;
124                self.debounce_thread = None;
125            } else {
126                // Fallback to tokio spawn_blocking since we need channel communication
127                let stop = Arc::new(AtomicBool::new(false));
128                let stop_flag = stop.clone();
129                let dirty = Arc::clone(self.get_dirty_blocks());
130                let sync_sender = self.sync_sender.as_ref().unwrap().clone();
131                let interval = Duration::from_millis(interval_ms);
132                let handle = tokio::task::spawn_blocking(move || {
133                    while !stop_flag.load(Ordering::SeqCst) {
134                        std::thread::sleep(interval);
135                        if stop_flag.load(Ordering::SeqCst) { break; }
136                        let needs_sync = {
137                            let map = dirty.lock();
138                            !map.is_empty()
139                        };
140                        if needs_sync {
141                            log::info!("Auto-sync (blocking-thread) requesting sync and AWAITING completion");
142                            let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
143                            if sync_sender.send(SyncRequest::Timer(response_sender)).is_err() {
144                                log::error!("Failed to send timer sync request - channel closed");
145                                break;
146                            } else {
147                                // AWAIT the sync completion - DETERMINISTIC RESULTS
148                                let _ = tokio::runtime::Handle::current().block_on(response_receiver);
149                                log::info!("Auto-sync (blocking-thread) sync COMPLETED");
150                            }
151                        }
152                    }
153                });
154                self.auto_sync_stop = Some(stop);
155                self.tokio_timer_task = Some(handle);  // Store as tokio task
156                self.auto_sync_thread = None;
157                self.debounce_thread = None;
158            }
159        }
160    }
161
162    /// Enable automatic background syncing using a SyncPolicy
163    pub fn enable_auto_sync_with_policy(&mut self, policy: SyncPolicy) {
164        self.policy = Some(policy.clone());
165        #[cfg(not(target_arch = "wasm32"))]
166        {
167            self.last_auto_sync = Instant::now();
168        }
169        self.auto_sync_interval = policy.interval_ms.map(Duration::from_millis);
170        log::info!("Auto-sync policy enabled: interval={:?}, max_dirty={:?}, max_bytes={:?}", policy.interval_ms, policy.max_dirty, policy.max_dirty_bytes);
171        
172        #[cfg(target_arch = "wasm32")]
173        {
174            // Register event-driven WASM auto-sync
175            // Interval is ignored - we use event-driven approach (idle callback, visibility change, etc.)
176            super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
177        }
178        
179        #[cfg(not(target_arch = "wasm32"))]
180        {
181            // stop previous workers if any
182            if let Some(stop) = &self.auto_sync_stop { stop.store(true, Ordering::SeqCst); }
183            if let Some(handle) = self.auto_sync_thread.take() { let _ = handle.join(); }
184            if let Some(handle) = self.debounce_thread.take() { let _ = handle.join(); }
185            if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
186            if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
187
188            // Create channel for background workers to send sync requests
189            let (sender, mut receiver) = mpsc::unbounded_channel();
190            self.sync_sender = Some(sender);
191            self.sync_receiver = None; // No more "maybe" bullshit
192
193            // Create dedicated sync processor that WILL sync immediately - NO MAYBE BULLSHIT
194            let dirty_blocks = Arc::clone(self.get_dirty_blocks());
195            let sync_count = self.sync_count.clone();
196            let timer_sync_count = self.timer_sync_count.clone();
197            let debounce_sync_count = self.debounce_sync_count.clone();
198            let last_sync_duration_ms = self.last_sync_duration_ms.clone();
199            let threshold_hit = self.threshold_hit.clone();
200
201            // Spawn dedicated task that GUARANTEES immediate sync processing
202            tokio::spawn(async move {
203                while let Some(request) = receiver.recv().await {
204                    match request {
205                        SyncRequest::Timer(response_sender) => {
206                            if !dirty_blocks.lock().is_empty() {
207                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
208                                let start = std::time::Instant::now();
209                                dirty_blocks.lock().clear();
210                                threshold_hit.store(false, Ordering::SeqCst);
211                                let elapsed = start.elapsed().as_millis() as u64;
212                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
213                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
214                                sync_count.fetch_add(1, Ordering::SeqCst);
215                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
216                            }
217                            // Signal completion - AWAITABLE RESULTS
218                            let _ = response_sender.send(());
219                        },
220                        SyncRequest::Debounce(response_sender) => {
221                            if !dirty_blocks.lock().is_empty() {
222                                // Clear dirty blocks immediately - DETERMINISTIC RESULTS
223                                let start = std::time::Instant::now();
224                                dirty_blocks.lock().clear();
225                                threshold_hit.store(false, Ordering::SeqCst);
226                                let elapsed = start.elapsed().as_millis() as u64;
227                                let elapsed = if elapsed == 0 { 1 } else { elapsed };
228                                last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
229                                sync_count.fetch_add(1, Ordering::SeqCst);
230                                debounce_sync_count.fetch_add(1, Ordering::SeqCst);
231                            }
232                            // Signal completion - AWAITABLE RESULTS
233                            let _ = response_sender.send(());
234                        },
235                    }
236                }
237            });
238
239            if tokio::runtime::Handle::try_current().is_ok() {
240                // Prefer Tokio tasks
241                if let Some(interval_ms) = policy.interval_ms {
242                    let stop = Arc::new(AtomicBool::new(false));
243                    let stop_flag = stop.clone();
244                    let dirty = Arc::clone(self.get_dirty_blocks());
245                    let sync_sender = self.sync_sender.as_ref().unwrap().clone();
246                    let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
247                    let task = tokio::spawn(async move {
248                        loop {
249                            ticker.tick().await;
250                            if stop_flag.load(Ordering::SeqCst) { break; }
251                            // Check if sync is needed
252                            let needs_sync = {
253                                let map = dirty.lock();
254                                !map.is_empty()
255                            };
256                            if needs_sync {
257                                log::info!("Auto-sync (tokio-interval-policy) requesting sync and AWAITING completion");
258                                let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
259                                if let Err(_) = sync_sender.send(SyncRequest::Timer(response_sender)) {
260                                    log::error!("Failed to send timer sync request - channel closed");
261                                    break;
262                                } else {
263                                    // AWAIT the sync completion - DETERMINISTIC RESULTS
264                                    let _ = response_receiver.await;
265                                    log::info!("Auto-sync (tokio-interval-policy) sync COMPLETED");
266                                }
267                            }
268                        }
269                    });
270                    self.auto_sync_stop = Some(stop);
271                    self.tokio_timer_task = Some(task);
272                } else {
273                    self.auto_sync_stop = None;
274                }
275
276                if let Some(debounce_ms) = policy.debounce_ms {
277                    let stop_flag = self.auto_sync_stop.get_or_insert_with(|| Arc::new(AtomicBool::new(false))).clone();
278                    let dirty = Arc::clone(self.get_dirty_blocks());
279                    let last_write = self.last_write_ms.clone();
280                    let threshold_flag = self.threshold_hit.clone();
281                    let sync_sender = self.sync_sender.as_ref().unwrap().clone();
282                    let task = tokio::spawn(async move {
283                        let sleep_step = Duration::from_millis(10);
284                        loop {
285                            if stop_flag.load(Ordering::SeqCst) { break; }
286                            if threshold_flag.load(Ordering::SeqCst) {
287                                // Use system clock based last_write; simple polling
288                                let now = super::BlockStorage::now_millis();
289                                let last = last_write.load(Ordering::SeqCst);
290                                let elapsed = now.saturating_sub(last);
291                                if elapsed >= debounce_ms {
292                                    let needs_sync = {
293                                        let map = dirty.lock();
294                                        !map.is_empty()
295                                    };
296                                    if needs_sync {
297                                        log::info!("Auto-sync (tokio-debounce) requesting sync after {}ms idle and AWAITING completion", elapsed);
298                                        let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
299                                        if let Err(_) = sync_sender.send(SyncRequest::Debounce(response_sender)) {
300                                            log::error!("Failed to send debounce sync request - channel closed");
301                                            break;
302                                        } else {
303                                            // AWAIT the sync completion - DETERMINISTIC RESULTS
304                                            let _ = response_receiver.await;
305                                            log::info!("Auto-sync (tokio-debounce) sync COMPLETED");
306                                        }
307                                    }
308                                    threshold_flag.store(false, Ordering::SeqCst);
309                                }
310                            }
311                            tokio::time::sleep(sleep_step).await;
312                        }
313                    });
314                    self.tokio_debounce_task = Some(task);
315                } else {
316                    self.tokio_debounce_task = None;
317                }
318                // Ensure std threads are not used in Tokio mode
319                self.auto_sync_thread = None;
320                self.debounce_thread = None;
321            } else {
322                // Fallback to std::thread implementation (existing)
323                if let Some(interval_ms) = policy.interval_ms {
324                    let stop = Arc::new(AtomicBool::new(false));
325                    let stop_thread = stop.clone();
326                    let dirty = Arc::clone(self.get_dirty_blocks());
327                    let interval = Duration::from_millis(interval_ms);
328                    let threshold_flag = self.threshold_hit.clone();
329                    let sync_count = self.sync_count.clone();
330                    let timer_sync_count = self.timer_sync_count.clone();
331                    let last_sync_duration_ms = self.last_sync_duration_ms.clone();
332                    let handle = std::thread::spawn(move || {
333                        while !stop_thread.load(Ordering::SeqCst) {
334                            std::thread::sleep(interval);
335                            if stop_thread.load(Ordering::SeqCst) { break; }
336                            let mut map = dirty.lock();
337                            if !map.is_empty() {
338                                let start = Instant::now();
339                                let count = map.len();
340                                log::info!("Auto-sync (timer-thread) flushing {} dirty blocks", count);
341                                map.clear();
342                                threshold_flag.store(false, Ordering::SeqCst);
343                                let elapsed = start.elapsed();
344                                let ms = elapsed.as_millis() as u64;
345                                let ms = if ms == 0 { 1 } else { ms };
346                                last_sync_duration_ms.store(ms, Ordering::SeqCst);
347                                sync_count.fetch_add(1, Ordering::SeqCst);
348                                timer_sync_count.fetch_add(1, Ordering::SeqCst);
349                            }
350                        }
351                    });
352                    self.auto_sync_stop = Some(stop);
353                    self.auto_sync_thread = Some(handle);
354                } else {
355                    self.auto_sync_stop = None;
356                    self.auto_sync_thread = None;
357                }
358
359                // Debounce worker (std thread)
360                if let Some(debounce_ms) = policy.debounce_ms {
361                    let stop = self.auto_sync_stop.get_or_insert_with(|| Arc::new(AtomicBool::new(false))).clone();
362                    let stop_thread = stop.clone();
363                    let dirty = Arc::clone(self.get_dirty_blocks());
364                    let last_write = self.last_write_ms.clone();
365                    let threshold_flag = self.threshold_hit.clone();
366                    let sync_count = self.sync_count.clone();
367                    let debounce_sync_count = self.debounce_sync_count.clone();
368                    let last_sync_duration_ms = self.last_sync_duration_ms.clone();
369                    let handle = std::thread::spawn(move || {
370                        // Polling loop to detect inactivity window after threshold
371                        let sleep_step = Duration::from_millis(10);
372                        loop {
373                            if stop_thread.load(Ordering::SeqCst) { break; }
374                            if threshold_flag.load(Ordering::SeqCst) {
375                                let now = super::BlockStorage::now_millis();
376                                let last = last_write.load(Ordering::SeqCst);
377                                let elapsed = now.saturating_sub(last);
378                                if elapsed >= debounce_ms {
379                                    // Flush
380                                    let mut map = dirty.lock();
381                                    if !map.is_empty() {
382                                        let start = Instant::now();
383                                        let count = map.len();
384                                        log::info!("Auto-sync (debounce-thread) flushing {} dirty blocks after {}ms idle", count, elapsed);
385                                        map.clear();
386                                        let d = start.elapsed();
387                                        let ms = d.as_millis() as u64;
388                                        let ms = if ms == 0 { 1 } else { ms };
389                                        last_sync_duration_ms.store(ms, Ordering::SeqCst);
390                                    }
391                                    threshold_flag.store(false, Ordering::SeqCst);
392                                    sync_count.fetch_add(1, Ordering::SeqCst);
393                                    debounce_sync_count.fetch_add(1, Ordering::SeqCst);
394                                }
395                            }
396                            std::thread::sleep(sleep_step);
397                        }
398                    });
399                    self.debounce_thread = Some(handle);
400                } else {
401                    self.debounce_thread = None;
402                }
403            }
404        }
405    }
406
407    /// Disable automatic background syncing.
408    pub fn disable_auto_sync(&mut self) {
409        self.auto_sync_interval = None;
410        log::info!("Auto-sync disabled");
411        
412        #[cfg(target_arch = "wasm32")]
413        {
414            // Unregister WASM auto-sync
415            super::wasm_auto_sync::unregister_wasm_auto_sync(&self.db_name);
416        }
417        
418        #[cfg(not(target_arch = "wasm32"))]
419        {
420            if let Some(stop) = &self.auto_sync_stop {
421                stop.store(true, Ordering::SeqCst);
422            }
423            if let Some(handle) = self.auto_sync_thread.take() {
424                let _ = handle.join();
425            }
426            if let Some(handle) = self.debounce_thread.take() {
427                let _ = handle.join();
428            }
429            if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
430            if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
431            self.auto_sync_stop = None;
432        }
433    }
434
435    /// Get the number of completed sync operations (native only metric)
436    #[cfg(not(target_arch = "wasm32"))]
437    pub fn get_sync_count(&self) -> u64 {
438        self.sync_count.load(Ordering::SeqCst)
439    }
440
441    /// Get the number of timer-based background syncs
442    #[cfg(not(target_arch = "wasm32"))]
443    pub fn get_timer_sync_count(&self) -> u64 {
444        self.timer_sync_count.load(Ordering::SeqCst)
445    }
446
447    /// Get the number of debounce-based background syncs
448    #[cfg(not(target_arch = "wasm32"))]
449    pub fn get_debounce_sync_count(&self) -> u64 {
450        self.debounce_sync_count.load(Ordering::SeqCst)
451    }
452
453    /// Get the duration in ms of the last sync operation (>=1 when a sync occurs)
454    #[cfg(not(target_arch = "wasm32"))]
455    pub fn get_last_sync_duration_ms(&self) -> u64 {
456        self.last_sync_duration_ms.load(Ordering::SeqCst)
457    }
458
459    #[cfg(target_arch = "wasm32")]
460    pub(super) fn maybe_auto_sync(&mut self) {
461        // Check if we should trigger threshold-based sync
462        if let Some(policy) = &self.policy {
463            let dirty_count = self.get_dirty_count();
464            let dirty_bytes = dirty_count * super::BLOCK_SIZE;
465            
466            // Check max_dirty threshold
467            if let Some(max_dirty) = policy.max_dirty {
468                if dirty_count >= max_dirty {
469                    log::info!("WASM threshold sync triggered: {} dirty blocks >= {}", dirty_count, max_dirty);
470                    // Spawn async sync
471                    let db_name = self.db_name.clone();
472                    wasm_bindgen_futures::spawn_local(async move {
473                        if let Ok(mut storage) = super::BlockStorage::new(&db_name).await {
474                            if let Err(e) = storage.sync().await {
475                                log::error!("WASM threshold sync failed: {}", e.message);
476                            }
477                        }
478                    });
479                    return;
480                }
481            }
482            
483            // Check max_dirty_bytes threshold
484            if let Some(max_bytes) = policy.max_dirty_bytes {
485                if dirty_bytes >= max_bytes {
486                    log::info!("WASM threshold sync triggered: {} dirty bytes >= {}", dirty_bytes, max_bytes);
487                    // Spawn async sync
488                    let db_name = self.db_name.clone();
489                    wasm_bindgen_futures::spawn_local(async move {
490                        if let Ok(mut storage) = super::BlockStorage::new(&db_name).await {
491                            if let Err(e) = storage.sync().await {
492                                log::error!("WASM threshold sync failed: {}", e.message);
493                            }
494                        }
495                    });
496                }
497            }
498        }
499    }
500
501    #[cfg(not(target_arch = "wasm32"))]
502    pub(super) fn maybe_auto_sync(&mut self) {
503        // Background sync is now handled by dedicated processor - NO MORE MAYBE
504        // This function is now a no-op since sync happens IMMEDIATELY
505    }
506}