use std::fmt::{self, Display, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use arc_swap::ArcSwap;
use tracing::{debug, info, warn};
use super::browser::BrowserKind;
use super::version::DriverVersion;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VersionSource {
LocalBrowser,
Latest,
Exact,
Capabilities,
}
impl Display for VersionSource {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let s = match self {
VersionSource::LocalBrowser => "local-browser",
VersionSource::Latest => "latest",
VersionSource::Exact => "exact",
VersionSource::Capabilities => "capabilities",
};
f.write_str(s)
}
}
impl VersionSource {
pub(crate) fn from_spec(spec: &DriverVersion) -> Self {
match spec {
DriverVersion::MatchLocalBrowser => VersionSource::LocalBrowser,
DriverVersion::Latest => VersionSource::Latest,
DriverVersion::Exact(_) => VersionSource::Exact,
DriverVersion::FromCapabilities => VersionSource::Capabilities,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Status {
BrowserKindResolved {
browser: BrowserKind,
},
LocalBrowserDetected {
browser: BrowserKind,
version: String,
binary: PathBuf,
},
DriverVersionResolving {
browser: BrowserKind,
requested: DriverVersion,
},
DriverVersionResolved {
browser: BrowserKind,
version: String,
source: VersionSource,
},
DriverCacheHit {
browser: BrowserKind,
version: String,
path: PathBuf,
},
DriverDownloadStarted {
browser: BrowserKind,
version: String,
url: String,
},
DriverDownloadRetry {
attempt: u32,
error: String,
},
DriverDownloadComplete {
browser: BrowserKind,
version: String,
bytes: u64,
duration: Duration,
},
DriverArchiveExtracted {
browser: BrowserKind,
path: PathBuf,
},
DriverProcessSpawned {
browser: BrowserKind,
version: String,
pid: u32,
port: u16,
binary: PathBuf,
},
DriverReady {
browser: BrowserKind,
version: String,
url: String,
elapsed: Duration,
},
DriverReused {
browser: BrowserKind,
version: String,
url: String,
},
SessionStarting {
browser: BrowserKind,
url: String,
},
SessionStarted {
browser: BrowserKind,
session_id: String,
url: String,
},
SessionEnded {
browser: BrowserKind,
session_id: String,
},
DriverShutdown {
browser: BrowserKind,
version: String,
port: u16,
},
}
impl Status {
fn driver_label(browser: BrowserKind) -> &'static str {
match browser {
BrowserKind::Chrome => "chromedriver",
BrowserKind::Firefox => "geckodriver",
BrowserKind::Edge => "msedgedriver",
BrowserKind::Safari => "safaridriver",
}
}
}
impl Display for Status {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Status::BrowserKindResolved {
browser,
} => {
write!(f, "{}: resolved browser kind", Self::driver_label(*browser))
}
Status::LocalBrowserDetected {
browser,
version,
binary,
} => write!(
f,
"{}: detected local browser {version} at {}",
Self::driver_label(*browser),
binary.display()
),
Status::DriverVersionResolving {
browser,
requested,
} => write!(
f,
"{}: resolving driver version (requested: {})",
Self::driver_label(*browser),
describe_spec(requested),
),
Status::DriverVersionResolved {
browser,
version,
source,
} => {
write!(f, "{} {version}: resolved (source: {source})", Self::driver_label(*browser),)
}
Status::DriverCacheHit {
browser,
version,
path,
} => write!(
f,
"{} {version}: cache hit at {}",
Self::driver_label(*browser),
path.display()
),
Status::DriverDownloadStarted {
browser,
version,
url,
} => write!(f, "{} {version}: download started ({url})", Self::driver_label(*browser)),
Status::DriverDownloadRetry {
attempt,
error,
} => {
write!(f, "driver download retry (attempt {attempt}): {error}")
}
Status::DriverDownloadComplete {
browser,
version,
bytes,
duration,
} => write!(
f,
"{} {version}: download complete ({} in {})",
Self::driver_label(*browser),
human_bytes(*bytes),
human_duration(*duration),
),
Status::DriverArchiveExtracted {
browser,
path,
} => write!(
f,
"{}: archive extracted to {}",
Self::driver_label(*browser),
path.display()
),
Status::DriverProcessSpawned {
browser,
version,
pid,
port,
binary,
} => write!(
f,
"{} {version}: process spawned pid={pid} port={port} binary={}",
Self::driver_label(*browser),
binary.display()
),
Status::DriverReady {
browser,
version,
url,
elapsed,
} => write!(
f,
"{} {version}: ready at {url} in {}",
Self::driver_label(*browser),
human_duration(*elapsed)
),
Status::DriverReused {
browser,
version,
url,
} => {
write!(f, "{} {version}: reused live driver at {url}", Self::driver_label(*browser))
}
Status::SessionStarting {
browser,
url,
} => {
write!(f, "{}: starting session at {url}", Self::driver_label(*browser))
}
Status::SessionStarted {
browser,
session_id,
url,
} => write!(
f,
"{}: session {} started at {url}",
Self::driver_label(*browser),
short_id(session_id)
),
Status::SessionEnded {
browser,
session_id,
} => write!(
f,
"{}: session {} ended",
Self::driver_label(*browser),
short_id(session_id)
),
Status::DriverShutdown {
browser,
version,
port,
} => write!(f, "{} {version}: shutdown (port {port})", Self::driver_label(*browser)),
}
}
}
fn describe_spec(spec: &DriverVersion) -> String {
match spec {
DriverVersion::MatchLocalBrowser => "match-local-browser".to_string(),
DriverVersion::FromCapabilities => "from-capabilities".to_string(),
DriverVersion::Latest => "latest".to_string(),
DriverVersion::Exact(v) => format!("exact {v}"),
}
}
fn short_id(id: &str) -> &str {
let n = id.char_indices().nth(8).map(|(i, _)| i).unwrap_or(id.len());
&id[..n]
}
fn human_bytes(bytes: u64) -> String {
const KIB: u64 = 1024;
const MIB: u64 = KIB * 1024;
const GIB: u64 = MIB * 1024;
if bytes >= GIB {
format!("{:.1} GiB", bytes as f64 / GIB as f64)
} else if bytes >= MIB {
format!("{:.1} MiB", bytes as f64 / MIB as f64)
} else if bytes >= KIB {
format!("{:.1} KiB", bytes as f64 / KIB as f64)
} else {
format!("{bytes} B")
}
}
fn human_duration(d: Duration) -> String {
let secs = d.as_secs_f64();
if secs >= 1.0 {
format!("{secs:.2}s")
} else {
format!("{}ms", d.as_millis())
}
}
pub(crate) type StatusCallback = Arc<dyn Fn(&Status) + Send + Sync>;
pub(crate) type DriverLogCallback = Arc<dyn Fn(&DriverLogLine) + Send + Sync>;
pub(crate) struct StatusSubscriber {
id: u64,
cb: StatusCallback,
}
#[derive(Clone)]
pub(crate) struct Emitter {
subs: Arc<ArcSwap<Vec<StatusSubscriber>>>,
next_id: Arc<AtomicU64>,
}
impl Emitter {
pub(crate) fn new() -> Self {
Self {
subs: Arc::new(ArcSwap::from_pointee(Vec::new())),
next_id: Arc::new(AtomicU64::new(0)),
}
}
pub(crate) fn add<F>(&self, f: F) -> Subscription
where
F: Fn(&Status) + Send + Sync + 'static,
{
let cb: StatusCallback = Arc::new(f);
self.add_arc(cb)
}
pub(crate) fn add_arc(&self, cb: StatusCallback) -> Subscription {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.subs.rcu(|current| {
let mut next = (**current).iter().map(clone_sub).collect::<Vec<_>>();
next.push(StatusSubscriber {
id,
cb: Arc::clone(&cb),
});
next
});
Subscription {
id,
subs: Arc::downgrade(&self.subs),
}
}
pub(crate) fn emit(&self, status: Status) {
emit_tracing(&status);
let list = self.subs.load();
for s in list.iter() {
let cb = Arc::clone(&s.cb);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (cb)(&status)));
}
}
}
fn clone_sub(s: &StatusSubscriber) -> StatusSubscriber {
StatusSubscriber {
id: s.id,
cb: Arc::clone(&s.cb),
}
}
fn emit_tracing(status: &Status) {
match status {
Status::DriverDownloadRetry {
..
} => warn!(target: "thirtyfour::manager", "{status}"),
Status::BrowserKindResolved {
..
}
| Status::LocalBrowserDetected {
..
}
| Status::DriverVersionResolving {
..
}
| Status::DriverArchiveExtracted {
..
} => {
debug!(target: "thirtyfour::manager", "{status}")
}
Status::DriverVersionResolved {
..
}
| Status::DriverCacheHit {
..
}
| Status::DriverDownloadStarted {
..
}
| Status::DriverDownloadComplete {
..
}
| Status::DriverProcessSpawned {
..
}
| Status::DriverReady {
..
}
| Status::DriverReused {
..
}
| Status::SessionStarting {
..
}
| Status::SessionStarted {
..
}
| Status::SessionEnded {
..
}
| Status::DriverShutdown {
..
} => info!(target: "thirtyfour::manager", "{status}"),
}
}
pub struct Subscription {
id: u64,
subs: std::sync::Weak<ArcSwap<Vec<StatusSubscriber>>>,
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Subscription").field("id", &self.id).finish()
}
}
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(subs) = self.subs.upgrade() {
let id = self.id;
subs.rcu(|current| {
(**current).iter().filter(|s| s.id != id).map(clone_sub).collect::<Vec<_>>()
});
}
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct DriverId(pub(super) u64);
impl DriverId {
pub(crate) fn from_raw(n: u64) -> Self {
DriverId(n)
}
}
impl Display for DriverId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "driver#{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DriverStream {
Stdout,
Stderr,
}
impl Display for DriverStream {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let s = match self {
DriverStream::Stdout => "stdout",
DriverStream::Stderr => "stderr",
};
f.write_str(s)
}
}
#[derive(Debug, Clone)]
pub struct DriverLogLine {
pub driver_id: DriverId,
pub browser: BrowserKind,
pub version: String,
pub port: u16,
pub stream: DriverStream,
pub line: String,
}
pub(crate) struct LogSubscriber {
pub(crate) id: u64,
pub(crate) cb: DriverLogCallback,
}
pub(crate) fn clone_log_sub(s: &LogSubscriber) -> LogSubscriber {
LogSubscriber {
id: s.id,
cb: Arc::clone(&s.cb),
}
}
#[derive(Clone)]
pub(crate) struct LogSubscribers {
pub(crate) subs: Arc<ArcSwap<Vec<LogSubscriber>>>,
next_id: Arc<AtomicU64>,
}
impl LogSubscribers {
pub(crate) fn new() -> Self {
Self {
subs: Arc::new(ArcSwap::from_pointee(Vec::new())),
next_id: Arc::new(AtomicU64::new(0)),
}
}
pub(crate) fn add<F>(&self, f: F) -> DriverLogSubscription
where
F: Fn(&DriverLogLine) + Send + Sync + 'static,
{
let cb: DriverLogCallback = Arc::new(f);
self.add_arc(cb)
}
pub(crate) fn add_arc(&self, cb: DriverLogCallback) -> DriverLogSubscription {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.subs.rcu(|current| {
let mut next = (**current).iter().map(clone_log_sub).collect::<Vec<_>>();
next.push(LogSubscriber {
id,
cb: Arc::clone(&cb),
});
next
});
DriverLogSubscription {
id,
subs: Arc::downgrade(&self.subs),
}
}
pub(crate) fn dispatch(&self, line: &DriverLogLine) {
let list = self.subs.load();
for s in list.iter() {
let cb = Arc::clone(&s.cb);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (cb)(line)));
}
}
}
pub struct DriverLogSubscription {
id: u64,
subs: std::sync::Weak<ArcSwap<Vec<LogSubscriber>>>,
}
impl std::fmt::Debug for DriverLogSubscription {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DriverLogSubscription").field("id", &self.id).finish()
}
}
impl Drop for DriverLogSubscription {
fn drop(&mut self) {
if let Some(subs) = self.subs.upgrade() {
let id = self.id;
subs.rcu(|current| {
(**current).iter().filter(|s| s.id != id).map(clone_log_sub).collect::<Vec<_>>()
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::Mutex;
#[test]
fn display_browser_kind_resolved() {
let s = Status::BrowserKindResolved {
browser: BrowserKind::Chrome,
};
assert_eq!(s.to_string(), "chromedriver: resolved browser kind");
}
#[test]
fn display_local_browser_detected() {
let s = Status::LocalBrowserDetected {
browser: BrowserKind::Firefox,
version: "128.0.1".to_string(),
binary: PathBuf::from("/usr/bin/firefox"),
};
assert_eq!(
s.to_string(),
"geckodriver: detected local browser 128.0.1 at /usr/bin/firefox"
);
}
#[test]
fn display_driver_version_resolved() {
let s = Status::DriverVersionResolved {
browser: BrowserKind::Edge,
version: "126.0.0".to_string(),
source: VersionSource::Latest,
};
assert_eq!(s.to_string(), "msedgedriver 126.0.0: resolved (source: latest)");
}
#[test]
fn display_driver_cache_hit() {
let s = Status::DriverCacheHit {
browser: BrowserKind::Chrome,
version: "126.0.6478.126".to_string(),
path: PathBuf::from("/cache/chromedriver"),
};
assert_eq!(s.to_string(), "chromedriver 126.0.6478.126: cache hit at /cache/chromedriver");
}
#[test]
fn display_driver_download_complete() {
let s = Status::DriverDownloadComplete {
browser: BrowserKind::Chrome,
version: "126.0.6478.126".to_string(),
bytes: 5 * 1024 * 1024,
duration: Duration::from_millis(1500),
};
assert_eq!(
s.to_string(),
"chromedriver 126.0.6478.126: download complete (5.0 MiB in 1.50s)"
);
}
#[test]
fn display_driver_ready() {
let s = Status::DriverReady {
browser: BrowserKind::Chrome,
version: "126.0.6478.126".to_string(),
url: "http://127.0.0.1:51234".to_string(),
elapsed: Duration::from_millis(412),
};
assert_eq!(
s.to_string(),
"chromedriver 126.0.6478.126: ready at http://127.0.0.1:51234 in 412ms"
);
}
#[test]
fn display_session_started_truncates_id() {
let s = Status::SessionStarted {
browser: BrowserKind::Chrome,
session_id: "abc123def456".to_string(),
url: "http://127.0.0.1:51234".to_string(),
};
assert_eq!(
s.to_string(),
"chromedriver: session abc123de started at http://127.0.0.1:51234"
);
}
#[test]
fn display_session_ended() {
let s = Status::SessionEnded {
browser: BrowserKind::Chrome,
session_id: "abc123def456".to_string(),
};
assert_eq!(s.to_string(), "chromedriver: session abc123de ended");
}
#[test]
fn display_driver_shutdown() {
let s = Status::DriverShutdown {
browser: BrowserKind::Firefox,
version: "0.36.0".to_string(),
port: 51234,
};
assert_eq!(s.to_string(), "geckodriver 0.36.0: shutdown (port 51234)");
}
#[test]
fn display_driver_id_and_stream() {
assert_eq!(DriverId(7).to_string(), "driver#7");
assert_eq!(DriverStream::Stdout.to_string(), "stdout");
assert_eq!(DriverStream::Stderr.to_string(), "stderr");
}
#[test]
fn emitter_dispatches_to_all_subscribers() {
let e = Emitter::new();
let log_a: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let log_b: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let a = Arc::clone(&log_a);
let _sub_a = e.add(move |s| a.lock().unwrap().push(s.to_string()));
let b = Arc::clone(&log_b);
let _sub_b = e.add(move |s| b.lock().unwrap().push(s.to_string()));
e.emit(Status::BrowserKindResolved {
browser: BrowserKind::Chrome,
});
assert_eq!(log_a.lock().unwrap().len(), 1);
assert_eq!(log_b.lock().unwrap().len(), 1);
}
#[test]
fn dropping_subscription_unsubscribes() {
let e = Emitter::new();
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let captured = Arc::clone(&log);
let sub = e.add(move |s| captured.lock().unwrap().push(s.to_string()));
e.emit(Status::BrowserKindResolved {
browser: BrowserKind::Chrome,
});
assert_eq!(log.lock().unwrap().len(), 1);
drop(sub);
e.emit(Status::BrowserKindResolved {
browser: BrowserKind::Firefox,
});
assert_eq!(log.lock().unwrap().len(), 1, "second emit must not reach a dropped subscriber");
}
#[test]
fn panicking_subscriber_does_not_block_others() {
let e = Emitter::new();
let log: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let _bad = e.add(|_| panic!("nope"));
let counter = Arc::clone(&log);
let _good = e.add(move |_| *counter.lock().unwrap() += 1);
e.emit(Status::BrowserKindResolved {
browser: BrowserKind::Chrome,
});
e.emit(Status::BrowserKindResolved {
browser: BrowserKind::Firefox,
});
assert_eq!(*log.lock().unwrap(), 2);
}
#[test]
fn log_subscribers_dispatch_and_drop() {
let subs = LogSubscribers::new();
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let captured = Arc::clone(&log);
let sub = subs.add(move |line| captured.lock().unwrap().push(line.line.clone()));
let l1 = DriverLogLine {
driver_id: DriverId(1),
browser: BrowserKind::Chrome,
version: "126".to_string(),
port: 51234,
stream: DriverStream::Stdout,
line: "hello".to_string(),
};
subs.dispatch(&l1);
assert_eq!(log.lock().unwrap().as_slice(), &["hello"]);
drop(sub);
let l2 = DriverLogLine {
line: "world".to_string(),
..l1.clone()
};
subs.dispatch(&l2);
assert_eq!(log.lock().unwrap().len(), 1, "dropped log subscription must stop receiving");
}
}