1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::{fs::Permissions, os::unix::prelude::PermissionsExt, path::Path, time::Instant};

use statime::{
    config::TimePropertiesDS,
    observability::{current::CurrentDS, default::DefaultDS, parent::ParentDS},
};
use tokio::{io::AsyncWriteExt, net::UnixStream, task::JoinHandle};

use crate::{
    config::Config,
    metrics::exporter::{ObservableState, ProgramData},
};

/// Observable version of the InstanceState struct
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
pub struct ObservableInstanceState {
    /// A concrete implementation of the PTP Default dataset (IEEE1588-2019
    /// section 8.2.1)
    pub default_ds: DefaultDS,
    /// A concrete implementation of the PTP Current dataset (IEEE1588-2019
    /// section 8.2.2)
    pub current_ds: CurrentDS,
    /// A concrete implementation of the PTP Parent dataset (IEEE1588-2019
    /// section 8.2.3)
    pub parent_ds: ParentDS,
    /// A concrete implementation of the PTP Time Properties dataset
    /// (IEEE1588-2019 section 8.2.4)
    pub time_properties_ds: TimePropertiesDS,
}

pub async fn spawn(
    config: &Config,
    instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
) -> JoinHandle<std::io::Result<()>> {
    let config = config.clone();
    tokio::spawn(async move {
        let result = observer(config, instance_state_receiver).await;
        if let Err(ref e) = result {
            log::warn!("Abnormal termination of the state observer: {e}");
            log::warn!("The state observer will not be available");
        }
        result
    })
}

async fn observer(
    config: Config,
    instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
) -> std::io::Result<()> {
    let start_time = Instant::now();

    let path = match config.observability.observation_path {
        Some(ref path) => path,
        None => return Ok(()),
    };

    // this binary needs to run as root to be able to adjust the system clock.
    // by default, the socket inherits root permissions, but the client should not
    // need elevated permissions to read from the socket. So we explicitly set
    // the permissions
    let permissions: std::fs::Permissions =
        PermissionsExt::from_mode(config.observability.observation_permissions);

    let peers_listener = create_unix_socket_with_permissions(path, permissions)?;

    loop {
        let (mut stream, _addr) = peers_listener.accept().await?;

        let observe = ObservableState {
            program: ProgramData::with_uptime(start_time.elapsed().as_secs_f64()),
            instance: instance_state_receiver.borrow().to_owned(),
        };

        write_json(&mut stream, &observe).await?;
    }
}

fn other_error<T>(msg: String) -> std::io::Result<T> {
    use std::io::{Error, ErrorKind};
    Err(Error::new(ErrorKind::Other, msg))
}

pub fn create_unix_socket_with_permissions(
    path: &Path,
    permissions: Permissions,
) -> std::io::Result<tokio::net::UnixListener> {
    let listener = create_unix_socket(path)?;

    std::fs::set_permissions(path, permissions)?;

    Ok(listener)
}

fn create_unix_socket(path: &Path) -> std::io::Result<tokio::net::UnixListener> {
    // must unlink path before the bind below (otherwise we get "address already in
    // use")
    if path.exists() {
        use std::os::unix::fs::FileTypeExt;

        let meta = std::fs::metadata(path)?;
        if !meta.file_type().is_socket() {
            return other_error(format!("path {path:?} exists but is not a socket"));
        }

        std::fs::remove_file(path)?;
    }

    // OS errors are terrible; let's try to do better
    let error = match tokio::net::UnixListener::bind(path) {
        Ok(listener) => return Ok(listener),
        Err(e) => e,
    };

    // we don create parent directories
    if let Some(parent) = path.parent() {
        if !parent.exists() {
            let msg = format!(
                r"Could not create observe socket at {:?} because its parent directory does not exist",
                &path
            );
            return other_error(msg);
        }
    }

    // otherwise, just forward the OS error
    let msg = format!(
        "Could not create observe socket at {:?}: {:?}",
        &path, error
    );

    other_error(msg)
}

pub async fn write_json<T>(stream: &mut UnixStream, value: &T) -> std::io::Result<()>
where
    T: serde::Serialize,
{
    let bytes = serde_json::to_vec(value).unwrap();
    stream.write_all(&bytes).await
}