1#[path = "server/action_router.rs"]
2mod action_router;
3#[path = "server/can_ops.rs"]
4mod can_ops;
5#[path = "server/shared_ops.rs"]
6mod shared_ops;
7#[path = "server/tick_ops.rs"]
8mod tick_ops;
9
10use crate::can::CanSocket;
11use crate::can::dbc::DbcBusOverlay;
12use crate::protocol::{Request, RequestAction, Response};
13use crate::shared::SharedRegion;
14use crate::sim::error::SimError;
15use crate::sim::project::Project;
16use crate::sim::time::TimeEngine;
17use crate::sim::types::{SignalType, SignalValue, SimCanBusDesc, SimCanFrame, SimSharedDesc};
18use globset::{Glob, GlobMatcher};
19use std::collections::{BTreeSet, HashMap};
20use std::path::PathBuf;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::{UnixListener, UnixStream};
23use tokio::sync::{mpsc, oneshot, watch};
24
25pub struct DaemonState {
26 session: String,
27 socket_path: PathBuf,
28 env: Option<String>,
29 project: Project,
30 can_attached: HashMap<String, AttachedCanBus>,
31 dbc_overlays: HashMap<String, DbcBusOverlay>,
32 shared_attached: HashMap<String, AttachedSharedChannel>,
33 frame_state: HashMap<String, HashMap<u32, SimCanFrame>>,
34 time: TimeEngine,
35 shutdown: bool,
36}
37
38struct AttachedCanBus {
39 meta: SimCanBusDesc,
40 socket: CanSocket,
41}
42
43struct AttachedSharedChannel {
44 meta: SimSharedDesc,
45 region: SharedRegion,
46 writer: bool,
47}
48
49struct ActionMessage {
50 request: Request,
51 response_tx: oneshot::Sender<Response>,
52}
53
54impl DaemonState {
55 pub fn new(
56 session: String,
57 socket_path: PathBuf,
58 project: Project,
59 env: Option<String>,
60 ) -> Self {
61 Self {
62 session,
63 socket_path,
64 env,
65 project,
66 can_attached: HashMap::new(),
67 dbc_overlays: HashMap::new(),
68 shared_attached: HashMap::new(),
69 frame_state: HashMap::new(),
70 time: TimeEngine::default(),
71 shutdown: false,
72 }
73 }
74
75 fn parse_value(signal_type: SignalType, raw: &str) -> Result<SignalValue, SimError> {
76 match signal_type {
77 SignalType::Bool => match raw {
78 "true" | "1" | "True" | "TRUE" => Ok(SignalValue::Bool(true)),
79 "false" | "0" | "False" | "FALSE" => Ok(SignalValue::Bool(false)),
80 _ => Err(SimError::InvalidArg(format!("invalid bool value '{raw}'"))),
81 },
82 SignalType::U32 => raw
83 .parse::<u32>()
84 .map(SignalValue::U32)
85 .map_err(|_| SimError::InvalidArg(format!("invalid u32 value '{raw}'"))),
86 SignalType::I32 => raw
87 .parse::<i32>()
88 .map(SignalValue::I32)
89 .map_err(|_| SimError::InvalidArg(format!("invalid i32 value '{raw}'"))),
90 SignalType::F32 => raw
91 .parse::<f32>()
92 .map(SignalValue::F32)
93 .map_err(|_| SimError::InvalidArg(format!("invalid f32 value '{raw}'"))),
94 SignalType::F64 => raw
95 .parse::<f64>()
96 .map(SignalValue::F64)
97 .map_err(|_| SimError::InvalidArg(format!("invalid f64 value '{raw}'"))),
98 }
99 }
100
101 fn select_signal_ids(
102 project: &Project,
103 selectors: &[String],
104 ) -> Result<Vec<u32>, Box<dyn std::error::Error + Send + Sync>> {
105 if selectors.is_empty() {
106 return Err("missing signal selectors".into());
107 }
108 let mut ids = BTreeSet::new();
109 for selector in selectors {
110 if selector == "*" {
111 ids.extend(project.signals().iter().map(|s| s.id));
112 continue;
113 }
114 if let Some(raw_id) = selector.strip_prefix('#') {
115 let id = raw_id.parse::<u32>()?;
116 if project.signal_by_id(id).is_none() {
117 return Err(format!("signal not found: '#{id}'").into());
118 }
119 ids.insert(id);
120 continue;
121 }
122 if selector.contains('*') || selector.contains('?') || selector.contains('[') {
123 let matcher = compile_glob(selector)?;
124 let mut matched = false;
125 for signal in project.signals() {
126 if matcher.is_match(&signal.name) {
127 ids.insert(signal.id);
128 matched = true;
129 }
130 }
131 if !matched {
132 return Err(format!("signal glob matched nothing: '{selector}'").into());
133 }
134 continue;
135 }
136
137 if let Some(id) = project.signal_id_by_name(selector) {
138 ids.insert(id);
139 } else {
140 return Err(format!("signal not found: '{selector}'").into());
141 }
142 }
143 Ok(ids.into_iter().collect())
144 }
145}
146
147fn compile_glob(pattern: &str) -> Result<GlobMatcher, Box<dyn std::error::Error + Send + Sync>> {
148 Ok(Glob::new(pattern)?.compile_matcher())
149}
150
151pub async fn run_listener(
152 session: String,
153 socket_path: PathBuf,
154 project: Project,
155 env: Option<String>,
156) -> Result<(), std::io::Error> {
157 if socket_path.exists() {
158 let _ = std::fs::remove_file(&socket_path);
159 }
160 if let Some(parent) = socket_path.parent() {
161 std::fs::create_dir_all(parent)?;
162 }
163 let listener = UnixListener::bind(&socket_path)?;
164 let pid_path = crate::daemon::lifecycle::pid_path(&session);
165 std::fs::write(&pid_path, std::process::id().to_string())?;
166 crate::daemon::lifecycle::write_env_tag(&session, env.as_deref())
167 .map_err(std::io::Error::other)?;
168
169 let state = DaemonState::new(session.clone(), socket_path.clone(), project, env);
170 let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
171 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
172
173 let tick_task = tokio::spawn(tick_ops::run_tick_task(state, action_rx, shutdown_tx));
174 let mut listener_error = None;
175
176 loop {
177 tokio::select! {
178 changed = shutdown_rx.changed() => {
179 match changed {
180 Ok(()) if *shutdown_rx.borrow() => break,
181 Ok(()) => {}
182 Err(_) => break,
183 }
184 }
185 accepted = listener.accept() => {
186 match accepted {
187 Ok((stream, _addr)) => {
188 let action_tx = action_tx.clone();
189 tokio::spawn(async move {
190 let _ = handle_connection(stream, action_tx).await;
191 });
192 }
193 Err(e) => {
194 listener_error = Some(e);
195 break;
196 }
197 }
198 }
199 }
200 }
201
202 drop(action_tx);
203 let _ = tick_task.await;
204
205 if socket_path.exists() {
206 let _ = std::fs::remove_file(&socket_path);
207 }
208 if pid_path.exists() {
209 let _ = std::fs::remove_file(pid_path);
210 }
211 crate::daemon::lifecycle::remove_env_tag(&session);
212
213 if let Some(err) = listener_error {
214 return Err(err);
215 }
216 Ok(())
217}
218
219async fn handle_connection(
220 stream: UnixStream,
221 action_tx: mpsc::Sender<ActionMessage>,
222) -> Result<(), std::io::Error> {
223 let (read_half, mut write_half) = stream.into_split();
224 let mut reader = BufReader::new(read_half);
225 let mut line = String::new();
226
227 loop {
228 line.clear();
229 let read = reader.read_line(&mut line).await?;
230 if read == 0 {
231 return Ok(());
232 }
233 let response = match serde_json::from_str::<Request>(line.trim_end()) {
234 Ok(request) => {
235 let request_id = request.id;
236 let (response_tx, response_rx) = oneshot::channel();
237 if action_tx
238 .send(ActionMessage {
239 request,
240 response_tx,
241 })
242 .await
243 .is_err()
244 {
245 Response::err(request_id, "daemon unavailable")
246 } else {
247 match response_rx.await {
248 Ok(response) => response,
249 Err(_) => Response::err(request_id, "daemon unavailable"),
250 }
251 }
252 }
253 Err(e) => Response {
254 id: uuid::Uuid::new_v4(),
255 success: false,
256 data: None,
257 error: Some(format!("invalid request json: {e}")),
258 },
259 };
260 let mut payload = serde_json::to_string(&response).unwrap_or_else(|e| {
261 format!("{{\"success\":false,\"error\":\"response serialization failed: {e}\"}}")
262 });
263 payload.push('\n');
264 write_half.write_all(payload.as_bytes()).await?;
265 }
266}
267
268async fn process_action_message(message: ActionMessage, state: &mut DaemonState) {
269 let response = handle_action(message.request, state).await;
270 let _ = message.response_tx.send(response);
271}
272
273async fn handle_action(request: Request, state: &mut DaemonState) -> Response {
274 let id = request.id;
275 let result = match request.action {
276 RequestAction::Instance(action) => {
277 action_router::dispatch_instance_action(action, state).await
278 }
279 RequestAction::Worker(action) => action_router::dispatch_worker_action(action, state).await,
280 RequestAction::Env(_) => Err("env-owned action sent to instance daemon".to_string()),
281 };
282
283 match result {
284 Ok(data) => Response::ok(id, data),
285 Err(e) => Response::err(id, e),
286 }
287}