use dashmap::DashMap;
use serde::Serialize;
use std::time::{Duration, Instant};
const MAX_SESSIONS: usize = 256;
const SESSION_TTL: Duration = Duration::from_secs(60 * 60);
pub struct SharedState {
pub sessions: DashMap<String, CrawlSession>,
}
impl SharedState {
pub fn new() -> Self {
Self {
sessions: DashMap::new(),
}
}
pub fn evict_stale_sessions(&self) {
let now = Instant::now();
let mut expired: Vec<String> = Vec::new();
let mut terminal: Vec<(String, Instant)> = Vec::new();
for entry in self.sessions.iter() {
let session = entry.value();
if matches!(
session.status,
CrawlSessionStatus::Complete | CrawlSessionStatus::Failed
) {
if now.duration_since(session.started_at) >= SESSION_TTL {
expired.push(entry.key().clone());
} else {
terminal.push((entry.key().clone(), session.started_at));
}
}
}
for key in &expired {
self.sessions.remove(key);
}
let len = self.sessions.len();
if len >= MAX_SESSIONS {
let overflow = len + 1 - MAX_SESSIONS;
terminal.sort_by_key(|(_, started_at)| *started_at); for (key, _) in terminal.into_iter().take(overflow) {
self.sessions.remove(&key);
}
}
}
}
#[derive(Serialize)]
pub struct CrawlSession {
pub status: CrawlSessionStatus,
pub pages: Vec<CrawlPageResult>,
#[serde(skip)]
pub started_at: Instant,
}
#[derive(Serialize, Clone, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum CrawlSessionStatus {
Running,
Complete,
#[allow(dead_code)]
Failed,
}
#[derive(Serialize, Clone)]
pub struct CrawlPageResult {
pub url: String,
pub content: String,
pub status_code: u16,
pub links: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
fn session(status: CrawlSessionStatus, started_at: Instant) -> CrawlSession {
CrawlSession {
status,
pages: Vec::new(),
started_at,
}
}
#[test]
fn running_sessions_are_never_evicted() {
let state = SharedState::new();
let old = Instant::now()
.checked_sub(SESSION_TTL * 2)
.unwrap_or_else(Instant::now);
state
.sessions
.insert("running".into(), session(CrawlSessionStatus::Running, old));
state.evict_stale_sessions();
assert!(state.sessions.contains_key("running"));
}
#[test]
fn ttl_expired_terminal_sessions_are_dropped() {
let state = SharedState::new();
let old = Instant::now()
.checked_sub(SESSION_TTL + Duration::from_secs(1))
.unwrap_or_else(Instant::now);
state
.sessions
.insert("old".into(), session(CrawlSessionStatus::Complete, old));
state.sessions.insert(
"fresh".into(),
session(CrawlSessionStatus::Complete, Instant::now()),
);
state.evict_stale_sessions();
assert!(!state.sessions.contains_key("old"));
assert!(state.sessions.contains_key("fresh"));
}
#[test]
fn hard_cap_evicts_oldest_terminal_first() {
let state = SharedState::new();
let base = Instant::now()
.checked_sub(Duration::from_secs(MAX_SESSIONS as u64 + 10))
.unwrap_or_else(Instant::now);
for i in 0..MAX_SESSIONS {
let started = base + Duration::from_secs(i as u64);
state.sessions.insert(
format!("s{i}"),
session(CrawlSessionStatus::Complete, started),
);
}
assert_eq!(state.sessions.len(), MAX_SESSIONS);
state.evict_stale_sessions();
assert!(state.sessions.len() < MAX_SESSIONS);
assert!(!state.sessions.contains_key("s0"), "oldest evicted");
assert!(
state
.sessions
.contains_key(&format!("s{}", MAX_SESSIONS - 1)),
"newest retained"
);
}
#[test]
fn running_sessions_survive_cap_pressure() {
let state = SharedState::new();
for i in 0..(MAX_SESSIONS + 5) {
state.sessions.insert(
format!("r{i}"),
session(CrawlSessionStatus::Running, Instant::now()),
);
}
state.evict_stale_sessions();
assert_eq!(state.sessions.len(), MAX_SESSIONS + 5);
}
}