Skip to main content

sc_observability/
follow.rs

1#![expect(
2    clippy::missing_errors_doc,
3    reason = "follow-session error behavior is documented at the public facade level, and repeating it here would add low-signal boilerplate"
4)]
5#![expect(
6    clippy::must_use_candidate,
7    reason = "lightweight follow accessors are intentionally kept free of repetitive must_use decoration"
8)]
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use sc_observability_types::{LogQuery, LogSnapshot, QueryError, QueryHealthReport};
15
16use crate::health::QueryHealthTracker;
17use crate::query::{self, TrackedFile};
18
19/// Tail-style follow session over the active JSONL log and its rotation set.
20#[derive(Debug)]
21pub struct LogFollowSession {
22    active_log_path: PathBuf,
23    query: LogQuery,
24    tracked_files: Vec<TrackedFile>,
25    health: Arc<QueryHealthTracker>,
26    shutdown: Option<Arc<AtomicBool>>,
27}
28
29impl LogFollowSession {
30    pub(crate) fn with_health(
31        active_log_path: PathBuf,
32        query: LogQuery,
33        health: Arc<QueryHealthTracker>,
34        shutdown: Option<Arc<AtomicBool>>,
35    ) -> Result<Self, QueryError> {
36        query.validate()?;
37        let tracked_files = query::start_follow_tracking(&active_log_path)?;
38        Ok(Self {
39            active_log_path,
40            query,
41            tracked_files,
42            health,
43            shutdown,
44        })
45    }
46
47    /// Polls for newly appended matching log records since the last call.
48    pub fn poll(&mut self) -> Result<LogSnapshot, QueryError> {
49        if self
50            .shutdown
51            .as_ref()
52            .is_some_and(|shutdown| shutdown.load(Ordering::SeqCst))
53        {
54            let result = Err(query::shutdown_error());
55            self.health.record_result(&result);
56            return result;
57        }
58
59        let result = query::poll_follow_snapshot(
60            &self.active_log_path,
61            &self.query,
62            &mut self.tracked_files,
63        );
64        match result {
65            Ok(outcome) => {
66                if let Some(summary) = outcome.reset_summary {
67                    self.health.record_nonfatal_summary(summary);
68                } else {
69                    self.health.mark_healthy();
70                }
71                Ok(outcome.snapshot)
72            }
73            Err(error) => {
74                self.health.record_error(&error);
75                Err(error)
76            }
77        }
78    }
79
80    /// Returns the current query/follow health snapshot for this session.
81    pub fn health(&self) -> QueryHealthReport {
82        self.health.snapshot()
83    }
84}