use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
pub(crate) fn state_root() -> PathBuf {
std::env::var_os("XDG_STATE_HOME")
.map(PathBuf::from)
.filter(|path| path.is_absolute())
.or_else(|| std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".local/state")))
.unwrap_or_else(|| PathBuf::from(".pond-state"))
}
pub(crate) fn pond_state_dir() -> PathBuf {
state_root().join("pond")
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct SyncLockHolder {
pub pid: u32,
pub started_at: DateTime<Utc>,
}
pub(crate) enum SyncLockState {
Acquired(SyncLockGuard),
Busy(Option<SyncLockHolder>),
}
pub(crate) struct SyncLockGuard {
_file: File,
}
pub(crate) fn try_acquire_sync_lock(store_key: &str) -> Result<SyncLockState> {
try_acquire_sync_lock_in(&pond_state_dir(), store_key)
}
fn try_acquire_sync_lock_in(dir: &Path, store_key: &str) -> Result<SyncLockState> {
std::fs::create_dir_all(dir).with_context(|| format!("failed to create {}", dir.display()))?;
let path = dir.join(format!("sync-{store_key}.lock"));
let mut file = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)
.with_context(|| format!("failed to open sync lock {}", path.display()))?;
match file.try_lock() {
Ok(()) => {
let holder = SyncLockHolder {
pid: std::process::id(),
started_at: Utc::now(),
};
let _ = file.set_len(0);
let _ = file.seek(SeekFrom::Start(0));
let _ = serde_json::to_vec(&holder).map(|bytes| file.write_all(&bytes));
let _ = file.flush();
Ok(SyncLockState::Acquired(SyncLockGuard { _file: file }))
}
Err(std::fs::TryLockError::WouldBlock) => {
let holder = std::fs::read_to_string(&path)
.ok()
.and_then(|text| serde_json::from_str(&text).ok());
Ok(SyncLockState::Busy(holder))
}
Err(std::fs::TryLockError::Error(error)) => {
tracing::warn!(
%error,
path = %path.display(),
"sync lock unsupported on this filesystem; proceeding without single-flight"
);
Ok(SyncLockState::Acquired(SyncLockGuard { _file: file }))
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct LastSyncRecord {
pub finished_at: DateTime<Utc>,
pub duration_secs: f64,
pub sessions_inserted: u64,
pub messages_inserted: u64,
pub outcome: SyncOutcome,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SyncOutcome {
Ok,
Error,
}
fn last_sync_path(dir: &Path, store_key: &str) -> PathBuf {
dir.join(format!("last-sync-{store_key}.json"))
}
pub(crate) fn write_last_sync(store_key: &str, record: &LastSyncRecord) {
if let Err(error) = write_last_sync_in(&pond_state_dir(), store_key, record) {
tracing::warn!(%error, "failed to write last-sync record");
}
}
fn write_last_sync_in(dir: &Path, store_key: &str, record: &LastSyncRecord) -> Result<()> {
std::fs::create_dir_all(dir).with_context(|| format!("failed to create {}", dir.display()))?;
let path = last_sync_path(dir, store_key);
let bytes = serde_json::to_vec_pretty(record).context("serialize last-sync record")?;
let tmp = path.with_extension("json.tmp");
std::fs::write(&tmp, &bytes).with_context(|| format!("failed to write {}", tmp.display()))?;
std::fs::rename(&tmp, &path)
.with_context(|| format!("failed to install {}", path.display()))?;
Ok(())
}
pub(crate) fn read_last_sync(store_key: &str) -> Option<LastSyncRecord> {
read_last_sync_in(&pond_state_dir(), store_key)
}
fn read_last_sync_in(dir: &Path, store_key: &str) -> Option<LastSyncRecord> {
let text = std::fs::read_to_string(last_sync_path(dir, store_key)).ok()?;
serde_json::from_str(&text).ok()
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
#[test]
fn sync_lock_excludes_a_sibling_and_frees_on_drop() {
let dir = tempfile::TempDir::new().unwrap();
let guard = match try_acquire_sync_lock_in(dir.path(), "k1").unwrap() {
SyncLockState::Acquired(guard) => guard,
SyncLockState::Busy(_) => panic!("fresh lock must acquire"),
};
match try_acquire_sync_lock_in(dir.path(), "k1").unwrap() {
SyncLockState::Busy(holder) => {
let holder = holder.expect("holder info written on acquire");
assert_eq!(holder.pid, std::process::id());
}
SyncLockState::Acquired(_) => panic!("held lock must report busy"),
}
assert!(matches!(
try_acquire_sync_lock_in(dir.path(), "k2").unwrap(),
SyncLockState::Acquired(_)
));
drop(guard);
assert!(matches!(
try_acquire_sync_lock_in(dir.path(), "k1").unwrap(),
SyncLockState::Acquired(_)
));
}
#[test]
fn last_sync_record_round_trips_and_is_absent_before_first_write() {
let dir = tempfile::TempDir::new().unwrap();
assert!(read_last_sync_in(dir.path(), "k").is_none());
let record = LastSyncRecord {
finished_at: Utc::now(),
duration_secs: 12.5,
sessions_inserted: 3,
messages_inserted: 41,
outcome: SyncOutcome::Error,
error: Some("boom".to_owned()),
};
write_last_sync_in(dir.path(), "k", &record).unwrap();
let read = read_last_sync_in(dir.path(), "k").expect("record present");
assert_eq!(read.sessions_inserted, 3);
assert_eq!(read.outcome, SyncOutcome::Error);
assert_eq!(read.error.as_deref(), Some("boom"));
}
}