absurder_sql/storage/
auto_sync.rs1#[cfg(not(target_arch = "wasm32"))]
2use std::sync::atomic::{AtomicBool, Ordering};
3#[cfg(not(target_arch = "wasm32"))]
4use std::sync::Arc;
5use std::time::Duration;
6#[cfg(not(target_arch = "wasm32"))]
7use std::time::Instant;
8#[cfg(not(target_arch = "wasm32"))]
9use tokio::sync::mpsc;
10use crate::storage::SyncPolicy;
11#[cfg(not(target_arch = "wasm32"))]
12use super::block_storage::SyncRequest;
13
14impl super::BlockStorage {
15 pub fn enable_auto_sync(&mut self, interval_ms: u64) {
17 self.auto_sync_interval = Some(Duration::from_millis(interval_ms));
18 #[cfg(not(target_arch = "wasm32"))]
19 {
20 self.last_auto_sync = Instant::now();
21 }
22 self.policy = Some(SyncPolicy { interval_ms: Some(interval_ms), max_dirty: None, max_dirty_bytes: None, debounce_ms: None, verify_after_write: false });
23 log::info!("Auto-sync enabled: every {} ms", interval_ms);
24
25 #[cfg(target_arch = "wasm32")]
26 {
27 super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
30 }
31
32 #[cfg(not(target_arch = "wasm32"))]
33 {
34 if let Some(stop) = &self.auto_sync_stop { stop.store(true, Ordering::SeqCst); }
36 if let Some(handle) = self.auto_sync_thread.take() { let _ = handle.join(); }
37 if let Some(handle) = self.debounce_thread.take() { let _ = handle.join(); }
38 if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
39 if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
40
41 let (sender, mut receiver) = mpsc::unbounded_channel();
43 let dirty_blocks = Arc::clone(self.get_dirty_blocks());
44 let sync_count = self.sync_count.clone();
45 let timer_sync_count = self.timer_sync_count.clone();
46 let debounce_sync_count = self.debounce_sync_count.clone();
47 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
48
49 tokio::spawn(async move {
51 while let Some(request) = receiver.recv().await {
52 match request {
53 SyncRequest::Timer(response_sender) => {
54 if !dirty_blocks.lock().is_empty() {
55 let start = std::time::Instant::now();
57 dirty_blocks.lock().clear();
58 let elapsed = start.elapsed().as_millis() as u64;
59 let elapsed = if elapsed == 0 { 1 } else { elapsed };
60 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
61 sync_count.fetch_add(1, Ordering::SeqCst);
62 timer_sync_count.fetch_add(1, Ordering::SeqCst);
63 }
64 let _ = response_sender.send(());
66 },
67 SyncRequest::Debounce(response_sender) => {
68 if !dirty_blocks.lock().is_empty() {
69 let start = std::time::Instant::now();
71 dirty_blocks.lock().clear();
72 let elapsed = start.elapsed().as_millis() as u64;
73 let elapsed = if elapsed == 0 { 1 } else { elapsed };
74 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
75 sync_count.fetch_add(1, Ordering::SeqCst);
76 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
77 }
78 let _ = response_sender.send(());
80 },
81 }
82 }
83 });
84
85 self.sync_sender = Some(sender);
86 self.sync_receiver = None; if tokio::runtime::Handle::try_current().is_ok() {
90 let stop = Arc::new(AtomicBool::new(false));
91 let stop_flag = stop.clone();
92 let dirty = Arc::clone(self.get_dirty_blocks());
93 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
94 let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
95 let task = tokio::spawn(async move {
97 loop {
98 ticker.tick().await;
99 if stop_flag.load(Ordering::SeqCst) { break; }
100 let needs_sync = {
102 let map = dirty.lock();
103 !map.is_empty()
104 };
105 if needs_sync {
106 log::info!("Auto-sync (tokio-interval) requesting sync and AWAITING completion");
107 let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
108 if let Err(_) = sync_sender.send(SyncRequest::Timer(response_sender)) {
109 log::error!("Failed to send timer sync request - channel closed");
110 break;
111 } else {
112 let _ = response_receiver.await;
114 log::info!("Auto-sync (tokio-interval) sync COMPLETED");
115 }
116 } else {
117 log::debug!("Auto-sync (tokio-interval) - no dirty blocks, skipping sync request");
118 }
119 }
120 });
121 self.auto_sync_stop = Some(stop);
122 self.tokio_timer_task = Some(task);
123 self.auto_sync_thread = None;
124 self.debounce_thread = None;
125 } else {
126 let stop = Arc::new(AtomicBool::new(false));
128 let stop_flag = stop.clone();
129 let dirty = Arc::clone(self.get_dirty_blocks());
130 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
131 let interval = Duration::from_millis(interval_ms);
132 let handle = tokio::task::spawn_blocking(move || {
133 while !stop_flag.load(Ordering::SeqCst) {
134 std::thread::sleep(interval);
135 if stop_flag.load(Ordering::SeqCst) { break; }
136 let needs_sync = {
137 let map = dirty.lock();
138 !map.is_empty()
139 };
140 if needs_sync {
141 log::info!("Auto-sync (blocking-thread) requesting sync and AWAITING completion");
142 let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
143 if sync_sender.send(SyncRequest::Timer(response_sender)).is_err() {
144 log::error!("Failed to send timer sync request - channel closed");
145 break;
146 } else {
147 let _ = tokio::runtime::Handle::current().block_on(response_receiver);
149 log::info!("Auto-sync (blocking-thread) sync COMPLETED");
150 }
151 }
152 }
153 });
154 self.auto_sync_stop = Some(stop);
155 self.tokio_timer_task = Some(handle); self.auto_sync_thread = None;
157 self.debounce_thread = None;
158 }
159 }
160 }
161
162 pub fn enable_auto_sync_with_policy(&mut self, policy: SyncPolicy) {
164 self.policy = Some(policy.clone());
165 #[cfg(not(target_arch = "wasm32"))]
166 {
167 self.last_auto_sync = Instant::now();
168 }
169 self.auto_sync_interval = policy.interval_ms.map(Duration::from_millis);
170 log::info!("Auto-sync policy enabled: interval={:?}, max_dirty={:?}, max_bytes={:?}", policy.interval_ms, policy.max_dirty, policy.max_dirty_bytes);
171
172 #[cfg(target_arch = "wasm32")]
173 {
174 super::wasm_auto_sync::register_wasm_auto_sync(&self.db_name);
177 }
178
179 #[cfg(not(target_arch = "wasm32"))]
180 {
181 if let Some(stop) = &self.auto_sync_stop { stop.store(true, Ordering::SeqCst); }
183 if let Some(handle) = self.auto_sync_thread.take() { let _ = handle.join(); }
184 if let Some(handle) = self.debounce_thread.take() { let _ = handle.join(); }
185 if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
186 if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
187
188 let (sender, mut receiver) = mpsc::unbounded_channel();
190 self.sync_sender = Some(sender);
191 self.sync_receiver = None; let dirty_blocks = Arc::clone(self.get_dirty_blocks());
195 let sync_count = self.sync_count.clone();
196 let timer_sync_count = self.timer_sync_count.clone();
197 let debounce_sync_count = self.debounce_sync_count.clone();
198 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
199 let threshold_hit = self.threshold_hit.clone();
200
201 tokio::spawn(async move {
203 while let Some(request) = receiver.recv().await {
204 match request {
205 SyncRequest::Timer(response_sender) => {
206 if !dirty_blocks.lock().is_empty() {
207 let start = std::time::Instant::now();
209 dirty_blocks.lock().clear();
210 threshold_hit.store(false, Ordering::SeqCst);
211 let elapsed = start.elapsed().as_millis() as u64;
212 let elapsed = if elapsed == 0 { 1 } else { elapsed };
213 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
214 sync_count.fetch_add(1, Ordering::SeqCst);
215 timer_sync_count.fetch_add(1, Ordering::SeqCst);
216 }
217 let _ = response_sender.send(());
219 },
220 SyncRequest::Debounce(response_sender) => {
221 if !dirty_blocks.lock().is_empty() {
222 let start = std::time::Instant::now();
224 dirty_blocks.lock().clear();
225 threshold_hit.store(false, Ordering::SeqCst);
226 let elapsed = start.elapsed().as_millis() as u64;
227 let elapsed = if elapsed == 0 { 1 } else { elapsed };
228 last_sync_duration_ms.store(elapsed, Ordering::SeqCst);
229 sync_count.fetch_add(1, Ordering::SeqCst);
230 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
231 }
232 let _ = response_sender.send(());
234 },
235 }
236 }
237 });
238
239 if tokio::runtime::Handle::try_current().is_ok() {
240 if let Some(interval_ms) = policy.interval_ms {
242 let stop = Arc::new(AtomicBool::new(false));
243 let stop_flag = stop.clone();
244 let dirty = Arc::clone(self.get_dirty_blocks());
245 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
246 let mut ticker = tokio::time::interval(Duration::from_millis(interval_ms));
247 let task = tokio::spawn(async move {
248 loop {
249 ticker.tick().await;
250 if stop_flag.load(Ordering::SeqCst) { break; }
251 let needs_sync = {
253 let map = dirty.lock();
254 !map.is_empty()
255 };
256 if needs_sync {
257 log::info!("Auto-sync (tokio-interval-policy) requesting sync and AWAITING completion");
258 let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
259 if let Err(_) = sync_sender.send(SyncRequest::Timer(response_sender)) {
260 log::error!("Failed to send timer sync request - channel closed");
261 break;
262 } else {
263 let _ = response_receiver.await;
265 log::info!("Auto-sync (tokio-interval-policy) sync COMPLETED");
266 }
267 }
268 }
269 });
270 self.auto_sync_stop = Some(stop);
271 self.tokio_timer_task = Some(task);
272 } else {
273 self.auto_sync_stop = None;
274 }
275
276 if let Some(debounce_ms) = policy.debounce_ms {
277 let stop_flag = self.auto_sync_stop.get_or_insert_with(|| Arc::new(AtomicBool::new(false))).clone();
278 let dirty = Arc::clone(self.get_dirty_blocks());
279 let last_write = self.last_write_ms.clone();
280 let threshold_flag = self.threshold_hit.clone();
281 let sync_sender = self.sync_sender.as_ref().unwrap().clone();
282 let task = tokio::spawn(async move {
283 let sleep_step = Duration::from_millis(10);
284 loop {
285 if stop_flag.load(Ordering::SeqCst) { break; }
286 if threshold_flag.load(Ordering::SeqCst) {
287 let now = super::BlockStorage::now_millis();
289 let last = last_write.load(Ordering::SeqCst);
290 let elapsed = now.saturating_sub(last);
291 if elapsed >= debounce_ms {
292 let needs_sync = {
293 let map = dirty.lock();
294 !map.is_empty()
295 };
296 if needs_sync {
297 log::info!("Auto-sync (tokio-debounce) requesting sync after {}ms idle and AWAITING completion", elapsed);
298 let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
299 if let Err(_) = sync_sender.send(SyncRequest::Debounce(response_sender)) {
300 log::error!("Failed to send debounce sync request - channel closed");
301 break;
302 } else {
303 let _ = response_receiver.await;
305 log::info!("Auto-sync (tokio-debounce) sync COMPLETED");
306 }
307 }
308 threshold_flag.store(false, Ordering::SeqCst);
309 }
310 }
311 tokio::time::sleep(sleep_step).await;
312 }
313 });
314 self.tokio_debounce_task = Some(task);
315 } else {
316 self.tokio_debounce_task = None;
317 }
318 self.auto_sync_thread = None;
320 self.debounce_thread = None;
321 } else {
322 if let Some(interval_ms) = policy.interval_ms {
324 let stop = Arc::new(AtomicBool::new(false));
325 let stop_thread = stop.clone();
326 let dirty = Arc::clone(self.get_dirty_blocks());
327 let interval = Duration::from_millis(interval_ms);
328 let threshold_flag = self.threshold_hit.clone();
329 let sync_count = self.sync_count.clone();
330 let timer_sync_count = self.timer_sync_count.clone();
331 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
332 let handle = std::thread::spawn(move || {
333 while !stop_thread.load(Ordering::SeqCst) {
334 std::thread::sleep(interval);
335 if stop_thread.load(Ordering::SeqCst) { break; }
336 let mut map = dirty.lock();
337 if !map.is_empty() {
338 let start = Instant::now();
339 let count = map.len();
340 log::info!("Auto-sync (timer-thread) flushing {} dirty blocks", count);
341 map.clear();
342 threshold_flag.store(false, Ordering::SeqCst);
343 let elapsed = start.elapsed();
344 let ms = elapsed.as_millis() as u64;
345 let ms = if ms == 0 { 1 } else { ms };
346 last_sync_duration_ms.store(ms, Ordering::SeqCst);
347 sync_count.fetch_add(1, Ordering::SeqCst);
348 timer_sync_count.fetch_add(1, Ordering::SeqCst);
349 }
350 }
351 });
352 self.auto_sync_stop = Some(stop);
353 self.auto_sync_thread = Some(handle);
354 } else {
355 self.auto_sync_stop = None;
356 self.auto_sync_thread = None;
357 }
358
359 if let Some(debounce_ms) = policy.debounce_ms {
361 let stop = self.auto_sync_stop.get_or_insert_with(|| Arc::new(AtomicBool::new(false))).clone();
362 let stop_thread = stop.clone();
363 let dirty = Arc::clone(self.get_dirty_blocks());
364 let last_write = self.last_write_ms.clone();
365 let threshold_flag = self.threshold_hit.clone();
366 let sync_count = self.sync_count.clone();
367 let debounce_sync_count = self.debounce_sync_count.clone();
368 let last_sync_duration_ms = self.last_sync_duration_ms.clone();
369 let handle = std::thread::spawn(move || {
370 let sleep_step = Duration::from_millis(10);
372 loop {
373 if stop_thread.load(Ordering::SeqCst) { break; }
374 if threshold_flag.load(Ordering::SeqCst) {
375 let now = super::BlockStorage::now_millis();
376 let last = last_write.load(Ordering::SeqCst);
377 let elapsed = now.saturating_sub(last);
378 if elapsed >= debounce_ms {
379 let mut map = dirty.lock();
381 if !map.is_empty() {
382 let start = Instant::now();
383 let count = map.len();
384 log::info!("Auto-sync (debounce-thread) flushing {} dirty blocks after {}ms idle", count, elapsed);
385 map.clear();
386 let d = start.elapsed();
387 let ms = d.as_millis() as u64;
388 let ms = if ms == 0 { 1 } else { ms };
389 last_sync_duration_ms.store(ms, Ordering::SeqCst);
390 }
391 threshold_flag.store(false, Ordering::SeqCst);
392 sync_count.fetch_add(1, Ordering::SeqCst);
393 debounce_sync_count.fetch_add(1, Ordering::SeqCst);
394 }
395 }
396 std::thread::sleep(sleep_step);
397 }
398 });
399 self.debounce_thread = Some(handle);
400 } else {
401 self.debounce_thread = None;
402 }
403 }
404 }
405 }
406
407 pub fn disable_auto_sync(&mut self) {
409 self.auto_sync_interval = None;
410 log::info!("Auto-sync disabled");
411
412 #[cfg(target_arch = "wasm32")]
413 {
414 super::wasm_auto_sync::unregister_wasm_auto_sync(&self.db_name);
416 }
417
418 #[cfg(not(target_arch = "wasm32"))]
419 {
420 if let Some(stop) = &self.auto_sync_stop {
421 stop.store(true, Ordering::SeqCst);
422 }
423 if let Some(handle) = self.auto_sync_thread.take() {
424 let _ = handle.join();
425 }
426 if let Some(handle) = self.debounce_thread.take() {
427 let _ = handle.join();
428 }
429 if let Some(task) = self.tokio_timer_task.take() { task.abort(); }
430 if let Some(task) = self.tokio_debounce_task.take() { task.abort(); }
431 self.auto_sync_stop = None;
432 }
433 }
434
435 #[cfg(not(target_arch = "wasm32"))]
437 pub fn get_sync_count(&self) -> u64 {
438 self.sync_count.load(Ordering::SeqCst)
439 }
440
441 #[cfg(not(target_arch = "wasm32"))]
443 pub fn get_timer_sync_count(&self) -> u64 {
444 self.timer_sync_count.load(Ordering::SeqCst)
445 }
446
447 #[cfg(not(target_arch = "wasm32"))]
449 pub fn get_debounce_sync_count(&self) -> u64 {
450 self.debounce_sync_count.load(Ordering::SeqCst)
451 }
452
453 #[cfg(not(target_arch = "wasm32"))]
455 pub fn get_last_sync_duration_ms(&self) -> u64 {
456 self.last_sync_duration_ms.load(Ordering::SeqCst)
457 }
458
459 #[cfg(target_arch = "wasm32")]
460 pub(super) fn maybe_auto_sync(&mut self) {
461 if let Some(policy) = &self.policy {
463 let dirty_count = self.get_dirty_count();
464 let dirty_bytes = dirty_count * super::BLOCK_SIZE;
465
466 if let Some(max_dirty) = policy.max_dirty {
468 if dirty_count >= max_dirty {
469 log::info!("WASM threshold sync triggered: {} dirty blocks >= {}", dirty_count, max_dirty);
470 let db_name = self.db_name.clone();
472 wasm_bindgen_futures::spawn_local(async move {
473 if let Ok(mut storage) = super::BlockStorage::new(&db_name).await {
474 if let Err(e) = storage.sync().await {
475 log::error!("WASM threshold sync failed: {}", e.message);
476 }
477 }
478 });
479 return;
480 }
481 }
482
483 if let Some(max_bytes) = policy.max_dirty_bytes {
485 if dirty_bytes >= max_bytes {
486 log::info!("WASM threshold sync triggered: {} dirty bytes >= {}", dirty_bytes, max_bytes);
487 let db_name = self.db_name.clone();
489 wasm_bindgen_futures::spawn_local(async move {
490 if let Ok(mut storage) = super::BlockStorage::new(&db_name).await {
491 if let Err(e) = storage.sync().await {
492 log::error!("WASM threshold sync failed: {}", e.message);
493 }
494 }
495 });
496 }
497 }
498 }
499 }
500
501 #[cfg(not(target_arch = "wasm32"))]
502 pub(super) fn maybe_auto_sync(&mut self) {
503 }
506}