statime_linux/
observer.rs1use std::{fs::Permissions, os::unix::prelude::PermissionsExt, path::Path, time::Instant};
2
3use statime::{
4 config::TimePropertiesDS,
5 observability::{
6 current::CurrentDS, default::DefaultDS, parent::ParentDS, port::PortDS, PathTraceDS,
7 },
8};
9use tokio::{io::AsyncWriteExt, net::UnixStream, task::JoinHandle};
10
11use crate::{
12 config::Config,
13 metrics::exporter::{ObservableState, ProgramData},
14};
15
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18pub struct ObservableInstanceState {
19 pub default_ds: DefaultDS,
22 pub current_ds: CurrentDS,
25 pub parent_ds: ParentDS,
28 pub time_properties_ds: TimePropertiesDS,
31 pub path_trace_ds: PathTraceDS,
34 pub port_ds: Vec<PortDS>,
36}
37
38pub async fn spawn(
39 config: &Config,
40 instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
41) -> JoinHandle<std::io::Result<()>> {
42 let config = config.clone();
43 tokio::spawn(async move {
44 let result = observer(config, instance_state_receiver).await;
45 if let Err(ref e) = result {
46 log::warn!("Abnormal termination of the state observer: {e}");
47 log::warn!("The state observer will not be available");
48 }
49 result
50 })
51}
52
53async fn observer(
54 config: Config,
55 instance_state_receiver: tokio::sync::watch::Receiver<ObservableInstanceState>,
56) -> std::io::Result<()> {
57 let start_time = Instant::now();
58
59 let path = match config.observability.observation_path {
60 Some(ref path) => path,
61 None => return Ok(()),
62 };
63
64 let permissions: std::fs::Permissions =
69 PermissionsExt::from_mode(config.observability.observation_permissions);
70
71 let peers_listener = create_unix_socket_with_permissions(path, permissions)?;
72
73 loop {
74 let (mut stream, _addr) = peers_listener.accept().await?;
75
76 let observe = ObservableState {
77 program: ProgramData::with_uptime(start_time.elapsed().as_secs_f64()),
78 instance: instance_state_receiver.borrow().to_owned(),
79 };
80
81 write_json(&mut stream, &observe).await?;
82 }
83}
84
85fn other_error<T>(msg: String) -> std::io::Result<T> {
86 use std::io::{Error, ErrorKind};
87 Err(Error::new(ErrorKind::Other, msg))
88}
89
90pub fn create_unix_socket_with_permissions(
91 path: &Path,
92 permissions: Permissions,
93) -> std::io::Result<tokio::net::UnixListener> {
94 let listener = create_unix_socket(path)?;
95
96 std::fs::set_permissions(path, permissions)?;
97
98 Ok(listener)
99}
100
101fn create_unix_socket(path: &Path) -> std::io::Result<tokio::net::UnixListener> {
102 if path.exists() {
105 use std::os::unix::fs::FileTypeExt;
106
107 let meta = std::fs::metadata(path)?;
108 if !meta.file_type().is_socket() {
109 return other_error(format!("path {path:?} exists but is not a socket"));
110 }
111
112 std::fs::remove_file(path)?;
113 }
114
115 let error = match tokio::net::UnixListener::bind(path) {
117 Ok(listener) => return Ok(listener),
118 Err(e) => e,
119 };
120
121 if let Some(parent) = path.parent() {
123 if !parent.exists() {
124 let msg = format!(
125 r"Could not create observe socket at {:?} because its parent directory does not exist",
126 &path
127 );
128 return other_error(msg);
129 }
130 }
131
132 let msg = format!(
134 "Could not create observe socket at {:?}: {:?}",
135 &path, error
136 );
137
138 other_error(msg)
139}
140
141pub async fn write_json<T>(stream: &mut UnixStream, value: &T) -> std::io::Result<()>
142where
143 T: serde::Serialize,
144{
145 let bytes = serde_json::to_vec(value).unwrap();
146 stream.write_all(&bytes).await
147}