use std::path::{Path, PathBuf};
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use tokio::sync::mpsc;
use crate::CoreResult;
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
struct SendTimeoutFuture(gloo_timers::future::TimeoutFuture);
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
unsafe impl Send for SendTimeoutFuture {}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl std::future::Future for SendTimeoutFuture {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.0).poll(cx)
}
}
pub struct FilePersistenceWorker {
tx: mpsc::UnboundedSender<Vec<u8>>,
#[cfg(feature = "frb-spawn")]
_task: StdMutex<Option<flutter_rust_bridge::JoinHandle<()>>>,
}
impl FilePersistenceWorker {
pub fn new(path: PathBuf, min_interval: Duration) -> CoreResult<Self> {
crate::runtime::ensure_initialized()?;
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<u8>>();
#[cfg(feature = "frb-spawn")]
let task = crate::runtime::spawn(async move {
worker_loop(&path, min_interval, &mut rx).await;
});
Ok(Self {
tx,
#[cfg(feature = "frb-spawn")]
_task: StdMutex::new(Some(task)),
})
}
pub fn queue(&self, bytes: Vec<u8>) {
let _ = self.tx.send(bytes);
}
}
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
async fn worker_loop(
path: &Path,
min_interval: Duration,
rx: &mut mpsc::UnboundedReceiver<Vec<u8>>,
) {
use std::time::Instant;
let mut pending: Option<Vec<u8>> = None;
let mut last_write = Instant::now()
.checked_sub(min_interval)
.unwrap_or_else(Instant::now);
loop {
let wait_for = match pending {
None => None,
Some(_) => {
let due = last_write + min_interval;
let now = Instant::now();
if due <= now {
Some(Duration::from_millis(0))
} else {
Some(due.duration_since(now))
}
}
};
let next = match wait_for {
None => rx.recv().await,
Some(duration) => tokio::time::timeout(duration, rx.recv())
.await
.unwrap_or(None),
};
match next {
Some(bytes) => pending = Some(bytes),
None => {
if let Some(bytes) = pending.take() {
let path = path.to_path_buf();
let _ = crate::runtime::spawn_blocking(move || {
crate::persistence::try_write_bytes_atomic(&path, &bytes);
})
.await;
last_write = Instant::now();
continue;
}
if rx.is_closed() {
break;
}
}
}
}
if let Some(bytes) = pending.take() {
let path = path.to_path_buf();
let _ = crate::runtime::spawn_blocking(move || {
crate::persistence::try_write_bytes_atomic(&path, &bytes);
})
.await;
}
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
async fn worker_loop(
path: &Path,
min_interval: Duration,
rx: &mut mpsc::UnboundedReceiver<Vec<u8>>,
) {
use futures::FutureExt as _;
let min_interval_ms = min_interval.as_millis().min(u128::from(u32::MAX)) as u32;
let mut pending: Option<Vec<u8>> = None;
let mut last_write_ms = now_ms().saturating_sub(u64::from(min_interval_ms));
loop {
match pending.take() {
None => match rx.recv().await {
Some(bytes) => pending = Some(bytes),
None => break,
},
Some(bytes) => {
pending = Some(bytes);
let due_ms = last_write_ms.saturating_add(u64::from(min_interval_ms));
let now = now_ms();
let remaining_ms = due_ms.saturating_sub(now).min(u64::from(u32::MAX)) as u32;
let recv_fut = rx.recv().fuse();
let sleep_fut =
SendTimeoutFuture(gloo_timers::future::TimeoutFuture::new(remaining_ms)).fuse();
futures::pin_mut!(recv_fut, sleep_fut);
futures::select_biased! {
value = recv_fut => {
match value {
Some(bytes) => pending = Some(bytes),
None => break,
}
}
_ = sleep_fut => {
if let Some(bytes) = pending.take() {
crate::persistence::try_write_bytes_atomic(path, &bytes);
last_write_ms = now_ms();
}
}
}
}
}
}
if let Some(bytes) = pending.take() {
crate::persistence::try_write_bytes_atomic(path, &bytes);
}
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
fn now_ms() -> u64 {
js_sys::Date::now().max(0.0) as u64
}