absurder_sql/storage/
auto_sync.rs1#[allow(unused_macros)]
3#[cfg(target_arch = "wasm32")]
4macro_rules! lock_mutex {
5 ($mutex:expr) => {
6 $mutex
7 .try_borrow_mut()
8 .expect("RefCell borrow failed - reentrancy detected in auto_sync.rs")
9 };
10}
11
12#[allow(unused_macros)]
13#[cfg(not(target_arch = "wasm32"))]
14macro_rules! lock_mutex {
15 ($mutex:expr) => {
16 $mutex.lock()
17 };
18}
19
20#[cfg(not(target_arch = "wasm32"))]
21use super::block_storage::SyncRequest;
22use crate::storage::SyncPolicy;
23#[cfg(not(target_arch = "wasm32"))]
24use std::sync::Arc;
25#[cfg(not(target_arch = "wasm32"))]
26use std::sync::atomic::{AtomicBool, Ordering};
27#[cfg(not(target_arch = "wasm32"))]
28use std::time::{Duration, Instant};
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::sync::mpsc;
31
32impl super::BlockStorage {
33 #[cfg(target_arch = "wasm32")]
38 pub fn enable_auto_sync(&self, interval_ms: u64) {
39 *lock_mutex!(self.policy) = Some(SyncPolicy {
40 interval_ms: Some(interval_ms),
41 max_dirty: None,
42 max_dirty_bytes: None,
43 debounce_ms: None,
44 verify_after_write: false,
45 });
46 *lock_mutex!(self.auto_sync_interval) = Some(std::time::Duration::from_millis(interval_ms));
47 log::info!("Auto-sync enabled: every {} ms", interval_ms);
48 }
49
50 #[cfg(not(target_arch = "wasm32"))]
51 pub fn enable_auto_sync(&mut self, interval_ms: u64) {
52 *lock_mutex!(self.policy) = Some(SyncPolicy {
53 interval_ms: Some(interval_ms),
54 max_dirty: None,
55 max_dirty_bytes: None,
56 debounce_ms: None,
57 verify_after_write: false,
58 });
59 log::info!("Auto-sync enabled: every {} ms", interval_ms);
60
61 #[cfg(target_arch = "wasm32")]
62 {
63 super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
66 }
67
68 #[cfg(not(target_arch = "wasm32"))]
69 {
70 if let Some(stop) = &self.auto_sync_stop {
72 stop.store(true, Ordering::SeqCst);
73 }
74 if let Some(handle) = self.auto_sync_thread.take() {
75 let _ = handle.join();
76 }
77 if let Some(handle) = self.debounce_thread.take() {
78 let _ = handle.join();
79 }
80 if let Some(task) = self.tokio_timer_task.take() {
81 task.abort();
82 }
83 if let Some(task) = self.tokio_debounce_task.take() {
84 task.abort();
85 }
86
87 let (sender, mut receiver) = mpsc::unbounded_channel();
89 let dirty_blocks = Arc::clone(self.get_dirty_blocks());
90 let sync_count = self.sync_count.clone();
91 let timer_sync_count = self.timer_sync_count.clone();
92 let debounce_sync_count = self.debounce_sync_count.clone();
93 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
94
95 tokio::spawn(async move {
97 while let Some(request) = receiver.recv().await {
98 match request {
99 SyncRequest::Timer(response_sender) => {
100 if !lock_mutex!(dirty_blocks).is_empty() {
101 let start = std::time::Instant::now();
103 lock_mutex!(dirty_blocks).clear();
104 let elapsed = start.elapsed().as_millis() as u64;
105 let elapsed = if elapsed == 0 { 1 } else { elapsed };
106 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
107 sync_count.fetch_add(1, Ordering::SeqCst);
108 timer_sync_count.fetch_add(1, Ordering::SeqCst);
109 }
110 let _ = response_sender.send(());
112 }
113 SyncRequest::Debounce(response_sender) => {
114 if !lock_mutex!(dirty_blocks).is_empty() {
115 let start = std::time::Instant::now();
117 lock_mutex!(dirty_blocks).clear();
118 let elapsed = start.elapsed().as_millis() as u64;
119 let elapsed = if elapsed == 0 { 1 } else { elapsed };
120 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
121 sync_count.fetch_add(1, Ordering::SeqCst);
122 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
123 }
124 let _ = response_sender.send(());
126 }
127 }
128 }
129 });
130
131 self.sync_sender = Some(sender);
132 self.sync_receiver = None; if tokio::runtime::Handle::try_current().is_ok() {
136 let stop = Arc::new(AtomicBool::new(false));
137 let stop_flag = stop.clone();
138 let dirty = Arc::clone(self.get_dirty_blocks());
139 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
140 let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
141 let task = tokio::spawn(async move {
143 loop {
144 ticker.tick().await;
145 if stop_flag.load(Ordering::SeqCst) {
146 break;
147 }
148 let needs_sync = {
150 let map = lock_mutex!(dirty);
151 !map.is_empty()
152 };
153 if needs_sync {
154 log::info!(
155 "Auto-sync (tokio-interval) requesting sync and AWAITING completion"
156 );
157 let (response_sender, response_receiver) =
158 tokio::sync::oneshot::channel();
159 if sync_sender
160 .send(SyncRequest::Timer(response_sender))
161 .is_err()
162 {
163 log::error!("Failed to send timer sync request - channel closed");
164 break;
165 } else {
166 let _ = response_receiver.await;
168 log::info!("Auto-sync (tokio-interval) sync COMPLETED");
169 }
170 } else {
171 log::debug!(
172 "Auto-sync (tokio-interval) - no dirty blocks, skipping sync request"
173 );
174 }
175 }
176 });
177 self.auto_sync_stop = Some(stop);
178 self.tokio_timer_task = Some(task);
179 self.auto_sync_thread = None;
180 self.debounce_thread = None;
181 } else {
182 let stop = Arc::new(AtomicBool::new(false));
184 let stop_flag = stop.clone();
185 let dirty = Arc::clone(self.get_dirty_blocks());
186 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
187 let interval = Duration::from_millis(interval_ms);
188 let handle = tokio::task::spawn_blocking(move || {
189 while !stop_flag.load(Ordering::SeqCst) {
190 std::thread::sleep(interval);
191 if stop_flag.load(Ordering::SeqCst) {
192 break;
193 }
194 let needs_sync = {
195 let map = lock_mutex!(dirty);
196 !map.is_empty()
197 };
198 if needs_sync {
199 log::info!(
200 "Auto-sync (blocking-thread) requesting sync and AWAITING completion"
201 );
202 let (response_sender, response_receiver) =
203 tokio::sync::oneshot::channel();
204 if sync_sender
205 .send(SyncRequest::Timer(response_sender))
206 .is_err()
207 {
208 log::error!("Failed to send timer sync request - channel closed");
209 break;
210 } else {
211 let _ =
213 tokio::runtime::Handle::current().block_on(response_receiver);
214 log::info!("Auto-sync (blocking-thread) sync COMPLETED");
215 }
216 }
217 }
218 });
219 self.auto_sync_stop = Some(stop);
220 self.tokio_timer_task = Some(handle); self.auto_sync_thread = None;
222 self.debounce_thread = None;
223 }
224 }
225 }
226
227 #[cfg(target_arch = "wasm32")]
229 pub fn enable_auto_sync_with_policy(&self, policy: SyncPolicy) {
230 *lock_mutex!(self.policy) = Some(policy.clone());
231 *lock_mutex!(self.auto_sync_interval) =
232 policy.interval_ms.map(std::time::Duration::from_millis);
233 log::info!("Auto-sync policy enabled");
234 }
235
236 #[cfg(not(target_arch = "wasm32"))]
237 pub fn enable_auto_sync_with_policy(&mut self, policy: SyncPolicy) {
238 *lock_mutex!(self.policy) = Some(policy.clone());
239 #[cfg(not(target_arch = "wasm32"))]
240 {
241 self.last_auto_sync = Instant::now();
242 }
243 *lock_mutex!(self.auto_sync_interval) = policy.interval_ms.map(Duration::from_millis);
244 log::info!(
245 "Auto-sync policy enabled: interval={:?}, max_dirty={:?}, max_bytes={:?}",
246 policy.interval_ms,
247 policy.max_dirty,
248 policy.max_dirty_bytes
249 );
250
251 #[cfg(target_arch = "wasm32")]
252 {
253 super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
256 }
257
258 #[cfg(not(target_arch = "wasm32"))]
259 {
260 if let Some(stop) = &self.auto_sync_stop {
262 stop.store(true, Ordering::SeqCst);
263 }
264 if let Some(handle) = self.auto_sync_thread.take() {
265 let _ = handle.join();
266 }
267 if let Some(handle) = self.debounce_thread.take() {
268 let _ = handle.join();
269 }
270 if let Some(task) = self.tokio_timer_task.take() {
271 task.abort();
272 }
273 if let Some(task) = self.tokio_debounce_task.take() {
274 task.abort();
275 }
276
277 let (sender, mut receiver) = mpsc::unbounded_channel();
279 self.sync_sender = Some(sender);
280 self.sync_receiver = None; let dirty_blocks = Arc::clone(self.get_dirty_blocks());
284 let sync_count = self.sync_count.clone();
285 let timer_sync_count = self.timer_sync_count.clone();
286 let debounce_sync_count = self.debounce_sync_count.clone();
287 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
288 let threshold_hit = self.threshold_hit.clone();
289
290 tokio::spawn(async move {
292 while let Some(request) = receiver.recv().await {
293 match request {
294 SyncRequest::Timer(response_sender) => {
295 if !lock_mutex!(dirty_blocks).is_empty() {
296 let start = std::time::Instant::now();
298 lock_mutex!(dirty_blocks).clear();
299 threshold_hit.store(false, Ordering::SeqCst);
300 let elapsed = start.elapsed().as_millis() as u64;
301 let elapsed = if elapsed == 0 { 1 } else { elapsed };
302 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
303 sync_count.fetch_add(1, Ordering::SeqCst);
304 timer_sync_count.fetch_add(1, Ordering::SeqCst);
305 }
306 let _ = response_sender.send(());
308 }
309 SyncRequest::Debounce(response_sender) => {
310 if !lock_mutex!(dirty_blocks).is_empty() {
311 let start = std::time::Instant::now();
313 lock_mutex!(dirty_blocks).clear();
314 threshold_hit.store(false, Ordering::SeqCst);
315 let elapsed = start.elapsed().as_millis() as u64;
316 let elapsed = if elapsed == 0 { 1 } else { elapsed };
317 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
318 sync_count.fetch_add(1, Ordering::SeqCst);
319 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
320 }
321 let _ = response_sender.send(());
323 }
324 }
325 }
326 });
327
328 if tokio::runtime::Handle::try_current().is_ok() {
329 if let Some(interval_ms) = policy.interval_ms {
331 let stop = Arc::new(AtomicBool::new(false));
332 let stop_flag = stop.clone();
333 let dirty = Arc::clone(self.get_dirty_blocks());
334 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
335 let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
336 let task = tokio::spawn(async move {
337 loop {
338 ticker.tick().await;
339 if stop_flag.load(Ordering::SeqCst) {
340 break;
341 }
342 let needs_sync = {
344 let map = dirty.lock();
345 !map.is_empty()
346 };
347 if needs_sync {
348 log::info!(
349 "Auto-sync (tokio-interval-policy) requesting sync and AWAITING completion"
350 );
351 let (response_sender, response_receiver) =
352 tokio::sync::oneshot::channel();
353 if sync_sender
354 .send(SyncRequest::Timer(response_sender))
355 .is_err()
356 {
357 log::error!(
358 "Failed to send timer sync request - channel closed"
359 );
360 break;
361 } else {
362 let _ = response_receiver.await;
364 log::info!("Auto-sync (tokio-interval-policy) sync COMPLETED");
365 }
366 }
367 }
368 });
369 self.auto_sync_stop = Some(stop);
370 self.tokio_timer_task = Some(task);
371 } else {
372 self.auto_sync_stop = None;
373 }
374
375 if let Some(debounce_ms) = policy.debounce_ms {
376 let stop_flag = self
377 .auto_sync_stop
378 .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
379 .clone();
380 let dirty = Arc::clone(self.get_dirty_blocks());
381 let last_write = self.last_write_ms.clone();
382 let threshold_flag = self.threshold_hit.clone();
383 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
384 let task = tokio::spawn(async move {
385 let sleep_step = Duration::from_millis(10);
386 loop {
387 if stop_flag.load(Ordering::SeqCst) {
388 break;
389 }
390 if threshold_flag.load(Ordering::SeqCst) {
391 let now = super::BlockStorage::now_millis();
393 let last = last_write.load(Ordering::SeqCst);
394 let elapsed = now.saturating_sub(last);
395 if elapsed >= debounce_ms {
396 let needs_sync = {
397 let map = dirty.lock();
398 !map.is_empty()
399 };
400 if needs_sync {
401 log::info!(
402 "Auto-sync (tokio-debounce) requesting sync after {}ms idle and AWAITING completion",
403 elapsed
404 );
405 let (response_sender, response_receiver) =
406 tokio::sync::oneshot::channel();
407 if sync_sender
408 .send(SyncRequest::Debounce(response_sender))
409 .is_err()
410 {
411 log::error!(
412 "Failed to send debounce sync request - channel closed"
413 );
414 break;
415 } else {
416 let _ = response_receiver.await;
418 log::info!("Auto-sync (tokio-debounce) sync COMPLETED");
419 }
420 }
421 threshold_flag.store(false, Ordering::SeqCst);
422 }
423 }
424 tokio::time::sleep(sleep_step).await;
425 }
426 });
427 self.tokio_debounce_task = Some(task);
428 } else {
429 self.tokio_debounce_task = None;
430 }
431 self.auto_sync_thread = None;
433 self.debounce_thread = None;
434 } else {
435 if let Some(interval_ms) = policy.interval_ms {
437 let stop = Arc::new(AtomicBool::new(false));
438 let stop_thread = stop.clone();
439 let dirty = Arc::clone(self.get_dirty_blocks());
440 let interval = Duration::from_millis(interval_ms);
441 let threshold_flag = self.threshold_hit.clone();
442 let sync_count = self.sync_count.clone();
443 let timer_sync_count = self.timer_sync_count.clone();
444 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
445 let handle = std::thread::spawn(move || {
446 while !stop_thread.load(Ordering::SeqCst) {
447 std::thread::sleep(interval);
448 if stop_thread.load(Ordering::SeqCst) {
449 break;
450 }
451 let mut map = dirty.lock();
452 if !map.is_empty() {
453 let start = Instant::now();
454 let count = map.len();
455 log::info!(
456 "Auto-sync (timer-thread) flushing {} dirty blocks",
457 count
458 );
459 map.clear();
460 threshold_flag.store(false, Ordering::SeqCst);
461 let elapsed = start.elapsed();
462 let ms = elapsed.as_millis() as u64;
463 let ms = if ms == 0 { 1 } else { ms };
464 last_sync_duration_ms.store(ms, Ordering::SeqCst);
465 sync_count.fetch_add(1, Ordering::SeqCst);
466 timer_sync_count.fetch_add(1, Ordering::SeqCst);
467 }
468 }
469 });
470 self.auto_sync_stop = Some(stop);
471 self.auto_sync_thread = Some(handle);
472 } else {
473 self.auto_sync_stop = None;
474 self.auto_sync_thread = None;
475 }
476
477 if let Some(debounce_ms) = policy.debounce_ms {
479 let stop = self
480 .auto_sync_stop
481 .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
482 .clone();
483 let stop_thread = stop.clone();
484 let dirty = Arc::clone(self.get_dirty_blocks());
485 let last_write = self.last_write_ms.clone();
486 let threshold_flag = self.threshold_hit.clone();
487 let sync_count = self.sync_count.clone();
488 let debounce_sync_count = self.debounce_sync_count.clone();
489 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
490 let handle = std::thread::spawn(move || {
491 let sleep_step = Duration::from_millis(10);
493 loop {
494 if stop_thread.load(Ordering::SeqCst) {
495 break;
496 }
497 if threshold_flag.load(Ordering::SeqCst) {
498 let now = super::BlockStorage::now_millis();
499 let last = last_write.load(Ordering::SeqCst);
500 let elapsed = now.saturating_sub(last);
501 if elapsed >= debounce_ms {
502 let mut map = dirty.lock();
504 if !map.is_empty() {
505 let start = Instant::now();
506 let count = map.len();
507 log::info!(
508 "Auto-sync (debounce-thread) flushing {} dirty blocks after {}ms idle",
509 count,
510 elapsed
511 );
512 map.clear();
513 let d = start.elapsed();
514 let ms = d.as_millis() as u64;
515 let ms = if ms == 0 { 1 } else { ms };
516 last_sync_duration_ms.store(ms, Ordering::SeqCst);
517 }
518 threshold_flag.store(false, Ordering::SeqCst);
519 sync_count.fetch_add(1, Ordering::SeqCst);
520 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
521 }
522 }
523 std::thread::sleep(sleep_step);
524 }
525 });
526 self.debounce_thread = Some(handle);
527 } else {
528 self.debounce_thread = None;
529 }
530 }
531 }
532 }
533
534 #[cfg(target_arch = "wasm32")]
536 pub fn disable_auto_sync(&self) {
537 *lock_mutex!(self.policy) = None;
538 *lock_mutex!(self.auto_sync_interval) = None;
539 log::info!("Auto-sync disabled");
540 }
541
542 #[cfg(not(target_arch = "wasm32"))]
543 pub fn disable_auto_sync(&mut self) {
544 *lock_mutex!(self.auto_sync_interval) = None;
545 log::info!("Auto-sync disabled");
546
547 #[cfg(target_arch = "wasm32")]
548 {
549 super::wasm_auto_sync::unregister_wasm_auto_sync(&self.db_name);
551 }
552
553 #[cfg(not(target_arch = "wasm32"))]
554 {
555 if let Some(stop) = &self.auto_sync_stop {
556 stop.store(true, Ordering::SeqCst);
557 }
558 if let Some(handle) = self.auto_sync_thread.take() {
559 let _ = handle.join();
560 }
561 if let Some(handle) = self.debounce_thread.take() {
562 let _ = handle.join();
563 }
564 if let Some(task) = self.tokio_timer_task.take() {
565 task.abort();
566 }
567 if let Some(task) = self.tokio_debounce_task.take() {
568 task.abort();
569 }
570 self.auto_sync_stop = None;
571 }
572 }
573
574 #[cfg(not(target_arch = "wasm32"))]
576 pub fn get_sync_count(&self) -> u64 {
577 self.sync_count.load(Ordering::SeqCst)
578 }
579
580 #[cfg(not(target_arch = "wasm32"))]
582 pub fn get_timer_sync_count(&self) -> u64 {
583 self.timer_sync_count.load(Ordering::SeqCst)
584 }
585
586 #[cfg(not(target_arch = "wasm32"))]
588 pub fn get_debounce_sync_count(&self) -> u64 {
589 self.debounce_sync_count.load(Ordering::SeqCst)
590 }
591
592 #[cfg(not(target_arch = "wasm32"))]
594 pub fn get_last_sync_duration_ms(&self) -> u64 {
595 self.last_sync_duration_ms.load(Ordering::SeqCst)
596 }
597
598 #[cfg(target_arch = "wasm32")]
599 pub(super) fn maybe_auto_sync(&self) {
600 if let Some(policy) = lock_mutex!(self.policy).clone() {
602 let dirty_count = self.get_dirty_count();
603 let dirty_bytes = dirty_count * super::BLOCK_SIZE;
604
605 if let Some(max_dirty) = policy.max_dirty {
607 if dirty_count >= max_dirty {
608 log::info!(
609 "WASM threshold sync triggered: {} dirty blocks >= {}",
610 dirty_count,
611 max_dirty
612 );
613 let db_name = self.db_name.clone();
615 wasm_bindgen_futures::spawn_local(async move {
616 if let Ok(storage) = super::BlockStorage::new(&db_name).await {
617 if let Err(e) = storage.sync().await {
618 log::error!("WASM threshold sync failed: {}", e.message);
619 }
620 }
621 });
622 return;
623 }
624 }
625
626 if let Some(max_bytes) = policy.max_dirty_bytes {
628 if dirty_bytes >= max_bytes {
629 log::info!(
630 "WASM threshold sync triggered: {} dirty bytes >= {}",
631 dirty_bytes,
632 max_bytes
633 );
634 let db_name = self.db_name.clone();
636 wasm_bindgen_futures::spawn_local(async move {
637 if let Ok(storage) = super::BlockStorage::new(&db_name).await {
638 if let Err(e) = storage.sync().await {
639 log::error!("WASM threshold sync failed: {}", e.message);
640 }
641 }
642 });
643 }
644 }
645 }
646 }
647
648 #[cfg(not(target_arch = "wasm32"))]
649 pub(super) fn maybe_auto_sync(&self) {
650 }
653}