tail-fin-daemon 0.5.1

Long-running browser-session daemon for tail-fin (tfd binary). Keeps Chrome tabs warm across invocations via a Unix-socket protocol; registers Site implementations through a runtime Arc<dyn Site> registry.
Documentation
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Instant;

/// Pool is keyed by a site identifier and a Chrome host (or "auto").
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct PoolKey {
    pub site: String,
    pub host: String,
}

impl PoolKey {
    pub fn new(site: impl Into<String>, host: impl Into<String>) -> Self {
        Self {
            site: site.into(),
            host: host.into(),
        }
    }
}

impl fmt::Display for PoolKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}@{}", self.site, self.host)
    }
}

/// State of a single pool member (a browser session / tab).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolState {
    /// Available for a new request.
    Idle,
    /// Held by a long-lived acquire; reaper leaves this alone.
    Acquired,
    /// Currently executing a business command (transient).
    InUse,
}

#[derive(Debug)]
pub struct PoolMember {
    pub session_id: String,
    pub state: PoolState,
    pub last_used: Instant,
}

/// In-memory pool manager: tracks which `session_id`s belong to which
/// `PoolKey`, and their current `PoolState`.
///
/// This does NOT create or destroy browser sessions — that's the caller's job.
/// The caller tells us "here's a new session for key K" (`register`), and
/// we track its state. To acquire, we either return an idle one or tell the
/// caller to create a new one (`None` if pool is at capacity).
#[derive(Clone, Default)]
pub struct PoolManager {
    inner: Arc<Mutex<Inner>>,
}

#[derive(Default)]
struct Inner {
    /// Pool members grouped by key.
    members: HashMap<PoolKey, Vec<PoolMember>>,
    /// Reverse index: session_id → PoolKey.
    by_id: HashMap<String, PoolKey>,
    /// Max size per pool. `0` = unlimited.
    max_per_pool: usize,
}

/// Outcome of `try_acquire` — either we got an idle session, or the caller
/// must create a new one (up to max), or wait.
#[derive(Debug)]
pub enum AcquireOutcome {
    /// Existing idle session picked from the pool. Caller marks it `InUse` or `Acquired` via `mark`.
    Existing(String),
    /// Pool has capacity — caller should create a new session and call `register`.
    CreateNew,
    /// Pool is at max capacity and all members are busy.
    Busy,
}

impl PoolManager {
    pub fn new(max_per_pool: usize) -> Self {
        Self {
            inner: Arc::new(Mutex::new(Inner {
                members: HashMap::new(),
                by_id: HashMap::new(),
                max_per_pool,
            })),
        }
    }

    /// Atomically acquire a session for `key`:
    /// - If an idle session exists, mark it `new_state` under the same lock and
    ///   return `Existing(sid)`.
    /// - If no idle session but pool has capacity, return `CreateNew` (caller
    ///   registers the newly-launched session via `register`).
    /// - If pool is at max and all busy, return `Busy`.
    ///
    /// Unlike `try_acquire`, this closes the check-and-mark window that would
    /// otherwise let two concurrent callers hand out the same session.
    pub fn acquire_atomic(&self, key: &PoolKey, new_state: PoolState) -> AcquireOutcome {
        let mut inner = self.inner.lock().unwrap();
        let max = inner.max_per_pool;
        if let Some(pool) = inner.members.get_mut(key) {
            if let Some(m) = pool.iter_mut().find(|m| m.state == PoolState::Idle) {
                m.state = new_state;
                m.last_used = Instant::now();
                return AcquireOutcome::Existing(m.session_id.clone());
            }
            if max == 0 || pool.len() < max {
                return AcquireOutcome::CreateNew;
            }
            return AcquireOutcome::Busy;
        }
        AcquireOutcome::CreateNew
    }

    /// Try to acquire a session for `key`. Does NOT change state — caller must
    /// call `mark` after a successful `Existing`, or `register` after creating a new one.
    pub fn try_acquire(&self, key: &PoolKey) -> AcquireOutcome {
        let inner = self.inner.lock().unwrap();
        let max = inner.max_per_pool;
        let pool = inner.members.get(key);
        if let Some(pool) = pool {
            if let Some(m) = pool.iter().find(|m| m.state == PoolState::Idle) {
                return AcquireOutcome::Existing(m.session_id.clone());
            }
            if max == 0 || pool.len() < max {
                return AcquireOutcome::CreateNew;
            }
            return AcquireOutcome::Busy;
        }
        // No pool yet — we can always create.
        AcquireOutcome::CreateNew
    }

    /// Register a newly-created session. Initial state is `InUse` so the caller
    /// can immediately use it without a second round-trip.
    pub fn register(&self, key: PoolKey, session_id: String, state: PoolState) {
        let mut inner = self.inner.lock().unwrap();
        inner.by_id.insert(session_id.clone(), key.clone());
        inner.members.entry(key).or_default().push(PoolMember {
            session_id,
            state,
            last_used: Instant::now(),
        });
    }

    /// Update a session's state. Returns `true` if found.
    pub fn mark(&self, session_id: &str, state: PoolState) -> bool {
        let mut inner = self.inner.lock().unwrap();
        let Some(key) = inner.by_id.get(session_id).cloned() else {
            return false;
        };
        let Some(pool) = inner.members.get_mut(&key) else {
            return false;
        };
        if let Some(m) = pool.iter_mut().find(|m| m.session_id == session_id) {
            m.state = state;
            m.last_used = Instant::now();
            true
        } else {
            false
        }
    }

    /// Remove a session from the pool (e.g., because its browser tab died).
    pub fn remove(&self, session_id: &str) -> Option<PoolKey> {
        let mut inner = self.inner.lock().unwrap();
        let key = inner.by_id.remove(session_id)?;
        if let Some(pool) = inner.members.get_mut(&key) {
            pool.retain(|m| m.session_id != session_id);
            if pool.is_empty() {
                inner.members.remove(&key);
            }
        }
        Some(key)
    }

    /// Snapshot of (session_id, pool_key, state, idle_secs) for `session.list`.
    pub fn snapshot(&self) -> Vec<(String, PoolKey, PoolState, u64)> {
        let inner = self.inner.lock().unwrap();
        let mut out = Vec::new();
        for (key, pool) in inner.members.iter() {
            for m in pool.iter() {
                out.push((
                    m.session_id.clone(),
                    key.clone(),
                    m.state,
                    m.last_used.elapsed().as_secs(),
                ));
            }
        }
        out
    }

    /// Lookup the PoolKey for a session_id.
    pub fn key_of(&self, session_id: &str) -> Option<PoolKey> {
        self.inner.lock().unwrap().by_id.get(session_id).cloned()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pool_key_display() {
        let k = PoolKey::new("sa", "127.0.0.1:9222");
        assert_eq!(k.to_string(), "sa@127.0.0.1:9222");
    }

    #[test]
    fn pool_key_equality() {
        let a = PoolKey::new("sa", "127.0.0.1:9222");
        let b = PoolKey::new("sa", "127.0.0.1:9222");
        let c = PoolKey::new("twitter", "127.0.0.1:9222");
        assert_eq!(a, b);
        assert_ne!(a, c);
    }

    #[test]
    fn empty_pool_says_create_new() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
    }

    #[test]
    fn register_then_acquire_reuses_idle() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
        match pm.try_acquire(&key) {
            AcquireOutcome::Existing(sid) => assert_eq!(sid, "sid-1"),
            other => panic!("expected Existing, got {other:?}"),
        }
    }

    #[test]
    fn in_use_sessions_do_not_match_idle_acquire() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
        // pool has capacity, so we get CreateNew, not Busy
        assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
    }

    #[test]
    fn pool_full_returns_busy() {
        let pm = PoolManager::new(2);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
        pm.register(key.clone(), "sid-2".into(), PoolState::InUse);
        assert!(matches!(pm.try_acquire(&key), AcquireOutcome::Busy));
    }

    #[test]
    fn unlimited_pool_never_busy() {
        let pm = PoolManager::new(0);
        let key = PoolKey::new("sa", "h");
        for i in 0..10 {
            pm.register(key.clone(), format!("sid-{i}"), PoolState::InUse);
        }
        assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
    }

    #[test]
    fn mark_updates_state() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::InUse);
        assert!(pm.mark("sid-1", PoolState::Idle));
        assert!(matches!(
            pm.try_acquire(&key),
            AcquireOutcome::Existing(s) if s == "sid-1"
        ));
    }

    #[test]
    fn remove_drops_member_and_empty_pool() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
        assert_eq!(pm.remove("sid-1"), Some(key.clone()));
        assert!(matches!(pm.try_acquire(&key), AcquireOutcome::CreateNew));
        assert_eq!(pm.key_of("sid-1"), None);
    }

    #[test]
    fn snapshot_lists_all_members() {
        let pm = PoolManager::new(3);
        let k1 = PoolKey::new("sa", "h");
        let k2 = PoolKey::new("twitter", "h");
        pm.register(k1.clone(), "s1".into(), PoolState::Idle);
        pm.register(k2.clone(), "s2".into(), PoolState::InUse);
        let snap = pm.snapshot();
        assert_eq!(snap.len(), 2);
    }

    #[test]
    fn acquire_atomic_flips_state() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::Idle);
        match pm.acquire_atomic(&key, PoolState::InUse) {
            AcquireOutcome::Existing(sid) => assert_eq!(sid, "sid-1"),
            other => panic!("expected Existing, got {other:?}"),
        }
        // Second call must NOT see sid-1 as idle anymore.
        assert!(matches!(
            pm.acquire_atomic(&key, PoolState::InUse),
            AcquireOutcome::CreateNew
        ));
    }

    #[test]
    fn snapshot_includes_state_per_member() {
        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "a".into(), PoolState::Idle);
        pm.register(key.clone(), "b".into(), PoolState::Acquired);
        pm.register(key.clone(), "c".into(), PoolState::InUse);

        let snap = pm.snapshot();
        let acquired: Vec<_> = snap
            .iter()
            .filter(|(_, _, s, _)| *s == PoolState::Acquired)
            .map(|(sid, _, _, _)| sid.as_str())
            .collect();
        assert_eq!(acquired, vec!["b"]);
    }

    #[test]
    fn acquire_atomic_concurrent_callers_see_different_outcomes() {
        use std::sync::Barrier;
        use std::thread;

        let pm = PoolManager::new(3);
        let key = PoolKey::new("sa", "h");
        pm.register(key.clone(), "sid-1".into(), PoolState::Idle);

        let barrier = std::sync::Arc::new(Barrier::new(2));
        let pm_a = pm.clone();
        let pm_b = pm.clone();
        let key_a = key.clone();
        let key_b = key.clone();
        let b_a = barrier.clone();
        let b_b = barrier.clone();

        let t_a = thread::spawn(move || {
            b_a.wait();
            pm_a.acquire_atomic(&key_a, PoolState::InUse)
        });
        let t_b = thread::spawn(move || {
            b_b.wait();
            pm_b.acquire_atomic(&key_b, PoolState::InUse)
        });

        let out_a = t_a.join().unwrap();
        let out_b = t_b.join().unwrap();

        // Exactly one thread gets Existing("sid-1"), the other gets CreateNew.
        let existing_count = [&out_a, &out_b]
            .iter()
            .filter(|o| matches!(o, AcquireOutcome::Existing(_)))
            .count();
        let create_count = [&out_a, &out_b]
            .iter()
            .filter(|o| matches!(o, AcquireOutcome::CreateNew))
            .count();
        assert_eq!(existing_count, 1, "exactly one should get Existing");
        assert_eq!(create_count, 1, "exactly one should get CreateNew");
    }
}