statime_linux/
observer.rs

1use std::{fs::Permissions, os::unix::prelude::PermissionsExt, path::Path, time::Instant};
2
3use statime::{
4    config::TimePropertiesDS,
5    observability::{
6        current::CurrentDS, default::DefaultDS, parent::ParentDS, port::PortDS, PathTraceDS,
7    },
8};
9use tokio::{io::AsyncWriteExt, net::UnixStream, task::JoinHandle};
10
11use crate::{
12    config::Config,
13    metrics::exporter::{ObservableState, ProgramData},
14};
15
16/// Observable version of the InstanceState struct
17#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18pub struct ObservableInstanceState {
19    /// A concrete implementation of the PTP Default dataset (IEEE1588-2019
20    /// section 8.2.1)
21    pub default_ds: DefaultDS,
22    /// A concrete implementation of the PTP Current dataset (IEEE1588-2019
23    /// section 8.2.2)
24    pub current_ds: CurrentDS,
25    /// A concrete implementation of the PTP Parent dataset (IEEE1588-2019
26    /// section 8.2.3)
27    pub parent_ds: ParentDS,
28    /// A concrete implementation of the PTP Time Properties dataset
29    /// (IEEE1588-2019 section 8.2.4)
30    pub time_properties_ds: TimePropertiesDS,
31    /// A concrete implementation of the PTP Path Trace dataset (IEEE1588-2019
32    /// section 16.2.2)
33    pub path_trace_ds: PathTraceDS,
34    /// Port datasets for all the ports.
35    pub port_ds: Vec<PortDS>,
36}
37
38pub async fn spawn(
39    config: &Config,
40    instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
41) -> JoinHandle<std::io::Result<()>> {
42    let config = config.clone();
43    tokio::spawn(async move {
44        let result = observer(config, instance_state_receiver).await;
45        if let Err(ref e) = result {
46            log::warn!("Abnormal termination of the state observer: {e}");
47            log::warn!("The state observer will not be available");
48        }
49        result
50    })
51}
52
53async fn observer(
54    config: Config,
55    instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
56) -> std::io::Result<()> {
57    let start_time = Instant::now();
58
59    let path = match config.observability.observation_path {
60        Some(ref path) => path,
61        None => return Ok(()),
62    };
63
64    // this binary needs to run as root to be able to adjust the system clock.
65    // by default, the socket inherits root permissions, but the client should not
66    // need elevated permissions to read from the socket. So we explicitly set
67    // the permissions
68    let permissions: std::fs::Permissions =
69        PermissionsExt::from_mode(config.observability.observation_permissions);
70
71    let peers_listener = create_unix_socket_with_permissions(path, permissions)?;
72
73    loop {
74        let (mut stream, _addr) = peers_listener.accept().await?;
75
76        let observe = ObservableState {
77            program: ProgramData::with_uptime(start_time.elapsed().as_secs_f64()),
78            instance: instance_state_receiver.borrow().to_owned(),
79        };
80
81        write_json(&mut stream, &observe).await?;
82    }
83}
84
85fn other_error<T>(msg: String) -> std::io::Result<T> {
86    use std::io::{Error, ErrorKind};
87    Err(Error::new(ErrorKind::Other, msg))
88}
89
90pub fn create_unix_socket_with_permissions(
91    path: &Path,
92    permissions: Permissions,
93) -> std::io::Result<tokio::net::UnixListener> {
94    let listener = create_unix_socket(path)?;
95
96    std::fs::set_permissions(path, permissions)?;
97
98    Ok(listener)
99}
100
101fn create_unix_socket(path: &Path) -> std::io::Result<tokio::net::UnixListener> {
102    // must unlink path before the bind below (otherwise we get "address already in
103    // use")
104    if path.exists() {
105        use std::os::unix::fs::FileTypeExt;
106
107        let meta = std::fs::metadata(path)?;
108        if !meta.file_type().is_socket() {
109            return other_error(format!("path {path:?} exists but is not a socket"));
110        }
111
112        std::fs::remove_file(path)?;
113    }
114
115    // OS errors are terrible; let's try to do better
116    let error = match tokio::net::UnixListener::bind(path) {
117        Ok(listener) => return Ok(listener),
118        Err(e) => e,
119    };
120
121    // we don create parent directories
122    if let Some(parent) = path.parent() {
123        if !parent.exists() {
124            let msg = format!(
125                r"Could not create observe socket at {:?} because its parent directory does not exist",
126                &path
127            );
128            return other_error(msg);
129        }
130    }
131
132    // otherwise, just forward the OS error
133    let msg = format!(
134        "Could not create observe socket at {:?}: {:?}",
135        &path, error
136    );
137
138    other_error(msg)
139}
140
141pub async fn write_json<T>(stream: &mut UnixStream, value: &T) -> std::io::Result<()>
142where
143    T: serde::Serialize,
144{
145    let bytes = serde_json::to_vec(value).unwrap();
146    stream.write_all(&bytes).await
147}