use std::{
fs::{File, create_dir_all},
path::PathBuf,
sync::Arc,
time::Duration,
};
use fred::{
interfaces::KeysInterface,
types::{Expiration, SetOptions},
};
use fs4::fs_std::FileExt;
use tokio::sync::Notify;
use xkv::R;
use crate::{Error, Layout, Result};
const PREFIX: &[u8] = b"sfid:";
fn lock_dir() -> PathBuf {
osid::dir().join("sfid")
}
const HEARTBEAT: Duration = Duration::from_secs(3 * 60);
const EXPIRE: i64 = 10 * 60;
pub struct Pid {
id: u16,
cancel: Arc<Notify>,
_lock: File,
}
impl Pid {
#[inline]
pub fn id(&self) -> u16 {
self.id
}
}
impl Drop for Pid {
fn drop(&mut self) {
self.cancel.notify_one();
}
}
fn local_identity(app: &[u8], max_pid: u32) -> Result<(Box<[u8]>, u16, File)> {
let machine_id = osid::get().map_err(Error::OsId)?;
let dir = lock_dir().join(String::from_utf8_lossy(app).as_ref());
create_dir_all(&dir).map_err(Error::LockFile)?;
for seq in 0..max_pid {
let path = dir.join(seq.to_string());
let file = match File::create(&path) {
Ok(f) => f,
Err(e) => {
log::warn!("create lock file {path:?}: {e}");
continue;
}
};
if file.try_lock_exclusive().is_ok() {
let identity = xbin::concat!(machine_id.as_bytes(), b":", &(seq as u16).to_le_bytes());
return Ok((identity.into(), seq as u16, file));
}
}
Err(Error::NoAvailablePid(max_pid))
}
fn pid_from_key(key: &[u8]) -> u16 {
let len = key.len();
u16::from_le_bytes([key[len - 2], key[len - 1]])
}
pub async fn allocate<L: Layout>(app: impl AsRef<[u8]>) -> Result<Pid> {
let app = app.as_ref();
let max_pid = L::MAX_PID;
let (local, local_seq, lock_file) = local_identity(app, max_pid)?;
let prefix = xbin::concat!(PREFIX, app, b":");
let start = local_seq as u32;
for i in 0..max_pid {
let id = ((start + i) % max_pid) as u16;
let key = xbin::concat!(&*prefix, &id.to_le_bytes());
let old: Option<Vec<u8>> = R
.set(
&*key,
&*local,
Some(Expiration::EX(EXPIRE)),
Some(SetOptions::NX),
true,
)
.await?;
match old {
None => {
if i > 16 {
let app = String::from_utf8_lossy(app);
log::info!("[{app}] pid allocated after {i} attempts");
}
return Ok(start_heartbeat(key.into(), local, lock_file));
}
Some(v) if *v == *local => return Ok(start_heartbeat(key.into(), local, lock_file)),
_ => {}
}
}
Err(Error::NoAvailablePid(max_pid))
}
fn start_heartbeat(key: Box<[u8]>, local: Box<[u8]>, lock_file: File) -> Pid {
let id = pid_from_key(&key);
let cancel = Arc::new(Notify::new());
let notify = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = notify.notified() => break,
_ = tokio::time::sleep(HEARTBEAT) => {
if let Err(e) = R
.set::<(), _, _>(&*key, &*local, Some(Expiration::EX(EXPIRE)), None, false)
.await
{
log::error!("heartbeat set: {e}");
}
}
}
}
});
Pid {
id,
cancel,
_lock: lock_file,
}
}