1#[path = "server/bootstrap.rs"]
2mod bootstrap;
3#[path = "server/dispatch.rs"]
4mod dispatch;
5#[path = "server/instance_worker.rs"]
6mod instance_worker;
7#[path = "server/tick.rs"]
8mod tick;
9
10use crate::can::CanSocket;
11use crate::can::dbc::{DbcBusOverlay, frame_key_from_frame};
12use crate::daemon::lifecycle::{kill_pid, read_pid};
13use crate::envd::lifecycle::pid_path;
14use crate::envd::spec::EnvSpec;
15use crate::protocol::{
16 CanFrameData, InstanceAction, Request, RequestAction, Response, parse_duration_us,
17};
18use crate::sim::time::TimeEngine;
19#[cfg(test)]
20use crate::sim::types::CAN_FLAG_EXTENDED;
21use crate::sim::types::SimCanFrame;
22use std::collections::{BTreeMap, HashMap};
23use std::path::PathBuf;
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::net::{UnixListener, UnixStream};
26use tokio::sync::{mpsc, oneshot, watch};
27use tokio::time::timeout;
28
29struct EnvState {
30 name: String,
31 socket_path: PathBuf,
32 tick_duration_us: u32,
33 instances: Vec<String>,
34 instance_workers: HashMap<String, instance_worker::InstanceWorker>,
35 time: TimeEngine,
36 can_buses: BTreeMap<String, EnvCanBusState>,
37 shutdown: bool,
38}
39
40struct EnvCanBusState {
41 name: String,
42 vcan_iface: String,
43 fd_capable: bool,
44 bitrate: u32,
45 bitrate_data: u32,
46 socket: CanSocket,
47 dbc: Option<DbcBusOverlay>,
48 latest_frames: HashMap<u32, SimCanFrame>,
49 schedules: BTreeMap<String, CanScheduleJob>,
50}
51
52#[derive(Clone)]
53struct CanScheduleJob {
54 job_id: String,
55 arb_id: u32,
56 flags: u8,
57 data_hex: String,
58 frame: SimCanFrame,
59 every_ticks: u64,
60 next_due_tick: u64,
61 enabled: bool,
62}
63
64struct ActionMessage {
65 request: Request,
66 response_tx: oneshot::Sender<Response>,
67}
68
69pub async fn run_listener(socket_path: PathBuf, env_spec: EnvSpec) -> Result<(), std::io::Error> {
70 if socket_path.exists() {
71 let _ = std::fs::remove_file(&socket_path);
72 }
73 if let Some(parent) = socket_path.parent() {
74 std::fs::create_dir_all(parent)?;
75 }
76
77 let state = EnvState::bootstrap(socket_path.clone(), env_spec)
78 .await
79 .map_err(std::io::Error::other)?;
80 let listener = match UnixListener::bind(&socket_path) {
81 Ok(listener) => listener,
82 Err(err) => {
83 cleanup_listener_runtime(&state).await;
84 return Err(err);
85 }
86 };
87 if let Err(err) = std::fs::write(pid_path(&state.name), std::process::id().to_string()) {
88 cleanup_listener_runtime(&state).await;
89 return Err(err);
90 }
91
92 let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
93 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
94 let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
95
96 let mut listener_error = None;
97 loop {
98 tokio::select! {
99 changed = shutdown_rx.changed() => {
100 match changed {
101 Ok(()) if *shutdown_rx.borrow() => break,
102 Ok(()) => {}
103 Err(_) => break,
104 }
105 }
106 accepted = listener.accept() => {
107 match accepted {
108 Ok((stream, _)) => {
109 let action_tx = action_tx.clone();
110 tokio::spawn(async move {
111 let _ = handle_connection(stream, action_tx).await;
112 });
113 }
114 Err(err) => {
115 listener_error = Some(err);
116 break;
117 }
118 }
119 }
120 }
121 }
122
123 drop(action_tx);
124 let state = actor_task
125 .await
126 .map_err(|err| std::io::Error::other(err.to_string()))?;
127 cleanup_listener_runtime(&state).await;
128 if let Some(err) = listener_error {
129 return Err(err);
130 }
131 Ok(())
132}
133
134async fn handle_connection(
135 stream: UnixStream,
136 action_tx: mpsc::Sender<ActionMessage>,
137) -> Result<(), std::io::Error> {
138 let (read_half, mut write_half) = stream.into_split();
139 let mut reader = BufReader::new(read_half);
140 let mut line = String::new();
141
142 loop {
143 line.clear();
144 let read = reader.read_line(&mut line).await?;
145 if read == 0 {
146 return Ok(());
147 }
148 let response = match serde_json::from_str::<Request>(line.trim_end()) {
149 Ok(request) => {
150 let request_id = request.id;
151 let (response_tx, response_rx) = oneshot::channel();
152 if action_tx
153 .send(ActionMessage {
154 request,
155 response_tx,
156 })
157 .await
158 .is_err()
159 {
160 Response::err(request_id, "env daemon unavailable")
161 } else {
162 match response_rx.await {
163 Ok(response) => response,
164 Err(_) => Response::err(request_id, "env daemon unavailable"),
165 }
166 }
167 }
168 Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
169 };
170 let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
171 format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
172 });
173 payload.push('\n');
174 write_half.write_all(payload.as_bytes()).await?;
175 }
176}
177
178async fn process_action_message(message: ActionMessage, state: &mut EnvState) {
179 let response = handle_action(message.request, state).await;
180 let _ = message.response_tx.send(response);
181}
182
183async fn handle_action(request: Request, state: &mut EnvState) -> Response {
184 let id = request.id;
185 let result = match request.action {
186 RequestAction::Env(action) => dispatch::dispatch_action(action, state).await,
187 RequestAction::Instance(_) | RequestAction::Worker(_) => {
188 Err("instance-owned action sent to env daemon".to_string())
189 }
190 };
191
192 match result {
193 Ok(data) => Response::ok(id, data),
194 Err(err) => Response::err(id, err),
195 }
196}
197
198async fn run_actor_task(
199 mut state: EnvState,
200 mut action_rx: mpsc::Receiver<ActionMessage>,
201 shutdown_tx: watch::Sender<bool>,
202) -> EnvState {
203 loop {
204 while let Ok(message) = action_rx.try_recv() {
205 process_action_message(message, &mut state).await;
206 }
207
208 if state.shutdown {
209 break;
210 }
211
212 let due_ticks = state.time.tick_realtime_due(state.tick_duration_us);
213 if let Err(err) = tick::advance_due_ticks(&mut state, due_ticks).await {
214 tracing::error!("env '{}' tick loop failed: {err}", state.name);
215 state.shutdown = true;
216 }
217
218 if state.shutdown {
219 break;
220 }
221
222 let sleep_duration = state.time.realtime_poll_delay(state.tick_duration_us);
223 match timeout(sleep_duration, action_rx.recv()).await {
224 Ok(Some(message)) => process_action_message(message, &mut state).await,
225 Ok(None) => break,
226 Err(_) => {}
227 }
228 }
229
230 let _ = shutdown_tx.send(true);
231 state
232}
233
234fn duration_to_env_ticks(tick_duration_us: u32, raw: &str) -> Result<u64, String> {
235 let duration_us = parse_duration_us(raw).map_err(|err| err.to_string())?;
236 if duration_us == 0 {
237 return Err("schedule period must be greater than zero".to_string());
238 }
239 let tick = u64::from(tick_duration_us.max(1));
240 Ok(duration_us.div_ceil(tick))
241}
242
243fn reset_env_can_state(state: &mut EnvState) {
244 for bus in state.can_buses.values_mut() {
245 let _ = bus.socket.recv_all();
246 bus.latest_frames.clear();
247 for schedule in bus.schedules.values_mut() {
248 schedule.next_due_tick = 0;
249 }
250 }
251}
252
253fn parse_env_frame(
254 state: &EnvState,
255 bus_name: &str,
256 arb_id: u32,
257 data_hex: &str,
258 flags: u8,
259) -> Result<SimCanFrame, String> {
260 let payload = crate::can::parse_data_hex(data_hex)?;
261 let mut data = [0_u8; 64];
262 data[..payload.len()].copy_from_slice(&payload);
263 let frame = SimCanFrame {
264 arb_id,
265 len: payload.len() as u8,
266 flags,
267 data,
268 };
269 validate_env_frame(state, bus_name, &frame)?;
270 Ok(frame)
271}
272
273fn send_env_frame(state: &mut EnvState, bus_name: &str, frame: &SimCanFrame) -> Result<(), String> {
274 validate_env_frame(state, bus_name, frame)?;
275 let bus = state
276 .can_buses
277 .get_mut(bus_name)
278 .ok_or_else(|| format!("env CAN bus '{bus_name}' not found"))?;
279 bus.socket.send(frame)?;
280 record_env_frame(bus, frame);
281 Ok(())
282}
283
284fn observe_env_bus_frames(state: &mut EnvState) -> Result<(), String> {
285 for bus in state.can_buses.values_mut() {
286 for frame in bus.socket.recv_all()? {
287 record_env_frame(bus, &frame);
288 }
289 }
290 Ok(())
291}
292
293fn record_env_frame(bus: &mut EnvCanBusState, frame: &SimCanFrame) {
294 bus.latest_frames
295 .insert(frame_key_from_frame(frame), frame.clone());
296}
297
298fn validate_env_frame(state: &EnvState, bus_name: &str, frame: &SimCanFrame) -> Result<(), String> {
299 let bus = state
300 .can_buses
301 .get(bus_name)
302 .ok_or_else(|| format!("env CAN bus '{bus_name}' not found"))?;
303 crate::can::validate_frame(&bus.name, bus.fd_capable, frame)
304}
305
306fn locate_schedule_mut<'a>(
307 state: &'a mut EnvState,
308 job_id: &str,
309) -> Result<(String, &'a mut CanScheduleJob), String> {
310 for (bus_name, bus) in &mut state.can_buses {
311 if let Some(schedule) = bus.schedules.get_mut(job_id) {
312 return Ok((bus_name.clone(), schedule));
313 }
314 }
315 Err(format!("CAN schedule '{job_id}' not found"))
316}
317
318fn locate_schedule_bus(state: &EnvState, job_id: &str) -> Result<String, String> {
319 state
320 .can_buses
321 .iter()
322 .find(|(_, bus)| bus.schedules.contains_key(job_id))
323 .map(|(bus_name, _)| bus_name.clone())
324 .ok_or_else(|| format!("CAN schedule '{job_id}' not found"))
325}
326
327async fn cleanup_listener_runtime(state: &EnvState) {
328 shutdown_instances(state).await;
329 if state.socket_path.exists() {
330 let _ = std::fs::remove_file(&state.socket_path);
331 }
332 let pid = pid_path(&state.name);
333 if pid.exists() {
334 let _ = std::fs::remove_file(pid);
335 }
336}
337
338fn ensure_unique_schedule_job_id<'a, I>(schedules: I, job_id: &str) -> Result<(), String>
339where
340 I: IntoIterator<Item = &'a BTreeMap<String, CanScheduleJob>>,
341{
342 if schedules
343 .into_iter()
344 .any(|schedule_map| schedule_map.contains_key(job_id))
345 {
346 return Err(format!("CAN schedule '{job_id}' already exists"));
347 }
348 Ok(())
349}
350
351fn frame_data(frame: &SimCanFrame) -> CanFrameData {
352 CanFrameData {
353 arb_id: frame.arb_id,
354 len: frame.len,
355 flags: frame.flags,
356 data_hex: frame
357 .payload()
358 .iter()
359 .map(|byte| format!("{byte:02X}"))
360 .collect::<Vec<_>>()
361 .join(""),
362 }
363}
364
365fn update_schedule(
366 schedule: &mut CanScheduleJob,
367 arb_id: u32,
368 data_hex: String,
369 frame: SimCanFrame,
370 every_ticks: u64,
371 current_tick: u64,
372) {
373 schedule.arb_id = arb_id;
374 schedule.flags = frame.flags;
375 schedule.data_hex = data_hex;
376 schedule.frame = frame;
377 schedule.every_ticks = every_ticks;
378 schedule.next_due_tick = current_tick;
379}
380
381fn start_schedule(schedule: &mut CanScheduleJob) {
382 schedule.enabled = true;
383}
384
385async fn shutdown_instances(state: &EnvState) {
386 let mut pending = Vec::with_capacity(state.instances.len());
387 for instance in &state.instances {
388 if let Some(worker) = state.instance_workers.get(instance)
389 && let Ok(response_rx) = worker.begin_instance_request(InstanceAction::Close).await
390 {
391 pending.push((instance.clone(), response_rx));
392 continue;
393 }
394 if let Some(pid) = read_pid(instance) {
395 let _ = kill_pid(pid);
396 }
397 }
398
399 for (instance, response_rx) in pending {
400 if response_rx.await.ok().and_then(Result::ok).is_none()
401 && let Some(pid) = read_pid(&instance)
402 {
403 let _ = kill_pid(pid);
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::envd::spec::EnvInstanceSpec;
412 use crate::load::LoadSpec;
413 use crate::protocol::{ResponseData, WorkerAction};
414 use serial_test::serial;
415 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
416 use tokio::net::UnixListener;
417
418 fn frame(arb_id: u32, flags: u8, data: &[u8]) -> SimCanFrame {
419 let mut payload = [0_u8; 64];
420 payload[..data.len()].copy_from_slice(data);
421 SimCanFrame {
422 arb_id,
423 len: data.len() as u8,
424 flags,
425 data: payload,
426 }
427 }
428
429 fn schedule(enabled: bool) -> CanScheduleJob {
430 let original_frame = frame(0x123, 0, &[0xAA, 0xBB]);
431 CanScheduleJob {
432 job_id: "job-1".to_string(),
433 arb_id: original_frame.arb_id,
434 flags: original_frame.flags,
435 data_hex: "AABB".to_string(),
436 frame: original_frame,
437 every_ticks: 10,
438 next_due_tick: 5,
439 enabled,
440 }
441 }
442
443 fn restore_agent_sim_home(original_home: Option<std::ffi::OsString>) {
444 if let Some(value) = original_home {
445 unsafe {
446 std::env::set_var("AGENT_SIM_HOME", value);
447 }
448 } else {
449 unsafe {
450 std::env::remove_var("AGENT_SIM_HOME");
451 }
452 }
453 }
454
455 #[test]
456 fn schedule_update_preserves_disabled_state() {
457 let mut schedule = schedule(false);
458 let updated_frame = frame(0x456, CAN_FLAG_EXTENDED, &[0x01, 0x02, 0x03]);
459
460 update_schedule(
461 &mut schedule,
462 updated_frame.arb_id,
463 "010203".to_string(),
464 updated_frame,
465 42,
466 17,
467 );
468
469 assert_eq!(schedule.arb_id, 0x456);
470 assert_eq!(schedule.flags, CAN_FLAG_EXTENDED);
471 assert_eq!(schedule.data_hex, "010203");
472 assert_eq!(schedule.every_ticks, 42);
473 assert_eq!(schedule.next_due_tick, 17);
474 assert!(!schedule.enabled);
475 assert_eq!(schedule.frame.len, 3);
476 assert_eq!(schedule.frame.payload(), &[0x01, 0x02, 0x03]);
477 }
478
479 #[test]
480 fn schedule_update_preserves_enabled_state() {
481 let mut schedule = schedule(true);
482 let updated_frame = frame(0x456, CAN_FLAG_EXTENDED, &[0x01, 0x02, 0x03]);
483
484 update_schedule(
485 &mut schedule,
486 updated_frame.arb_id,
487 "010203".to_string(),
488 updated_frame,
489 42,
490 23,
491 );
492
493 assert!(schedule.enabled);
494 assert_eq!(schedule.next_due_tick, 23);
495 assert_eq!(schedule.frame.payload(), &[0x01, 0x02, 0x03]);
496 }
497
498 #[test]
499 fn start_schedule_reenables_stopped_schedule() {
500 let mut schedule = schedule(false);
501 start_schedule(&mut schedule);
502
503 assert!(schedule.enabled);
504 }
505
506 #[test]
507 fn schedule_job_ids_must_be_unique_across_buses() {
508 let mut bus_a = BTreeMap::new();
509 let bus_b = BTreeMap::new();
510 bus_a.insert("job-1".to_string(), schedule(true));
511
512 let err = ensure_unique_schedule_job_id([&bus_a, &bus_b], "job-1").unwrap_err();
513
514 assert_eq!(err, "CAN schedule 'job-1' already exists");
515 }
516
517 #[test]
518 fn schedule_job_id_check_allows_new_ids() {
519 let mut bus_a = BTreeMap::new();
520 let bus_b = BTreeMap::new();
521 bus_a.insert("job-1".to_string(), schedule(true));
522
523 let result = ensure_unique_schedule_job_id([&bus_a, &bus_b], "job-2");
524
525 assert!(result.is_ok());
526 }
527
528 #[test]
529 fn schedule_period_rounds_up_to_avoid_running_faster_than_requested() {
530 assert_eq!(
531 duration_to_env_ticks(20, "30us").expect("schedule period should convert"),
532 2
533 );
534 assert_eq!(
535 duration_to_env_ticks(20, "40us").expect("schedule period should convert"),
536 2
537 );
538 }
539
540 #[tokio::test(flavor = "current_thread")]
541 #[serial]
542 async fn advance_single_tick_issues_direct_worker_step() {
543 let home = tempfile::tempdir().expect("temp AGENT_SIM_HOME should be creatable");
544 let original_home = std::env::var_os("AGENT_SIM_HOME");
545 unsafe {
546 std::env::set_var("AGENT_SIM_HOME", home.path());
547 }
548
549 let instance = "instance-a";
550 let socket_path = crate::daemon::lifecycle::socket_path(instance);
551 std::fs::create_dir_all(
552 socket_path
553 .parent()
554 .expect("instance socket should have a parent directory"),
555 )
556 .expect("instance socket parent should be creatable");
557 let listener =
558 UnixListener::bind(&socket_path).expect("fake instance listener should bind");
559 let server = tokio::spawn(async move {
560 let (mut stream, _) = listener
561 .accept()
562 .await
563 .expect("fake instance should accept worker-step request");
564 let mut line = String::new();
565 let mut reader = BufReader::new(&mut stream);
566 reader
567 .read_line(&mut line)
568 .await
569 .expect("request should be readable");
570 drop(reader);
571 let request: Request =
572 serde_json::from_str(line.trim_end()).expect("request json should parse");
573 assert!(matches!(
574 request.action,
575 RequestAction::Worker(WorkerAction::Step)
576 ));
577 let response = Response::ok(request.id, ResponseData::Ack);
578 let mut payload = serde_json::to_string(&response).expect("response should serialize");
579 payload.push('\n');
580 stream
581 .write_all(payload.as_bytes())
582 .await
583 .expect("response should be writable");
584 });
585 let worker = instance_worker::InstanceWorker::connect(instance)
586 .await
587 .expect("test worker should connect to fake instance");
588
589 let mut state = EnvState {
590 name: "env".to_string(),
591 socket_path: PathBuf::new(),
592 tick_duration_us: 20,
593 instances: vec![instance.to_string()],
594 instance_workers: HashMap::from([(instance.to_string(), worker)]),
595 time: TimeEngine::default(),
596 can_buses: BTreeMap::new(),
597 shutdown: false,
598 };
599
600 tick::advance_single_tick(&mut state)
601 .await
602 .expect("worker step should succeed");
603 server.await.expect("fake instance task should finish");
604
605 assert_eq!(state.time.status(state.tick_duration_us).elapsed_ticks, 1);
606
607 restore_agent_sim_home(original_home);
608 }
609
610 #[tokio::test(flavor = "current_thread")]
611 #[serial]
612 async fn bootstrap_instance_detached_removes_temp_file_when_spawn_fails() {
613 let home = tempfile::tempdir().expect("temp AGENT_SIM_HOME should be creatable");
614 let original_home = std::env::var_os("AGENT_SIM_HOME");
615 unsafe {
616 std::env::set_var("AGENT_SIM_HOME", home.path());
617 }
618
619 let instance = EnvInstanceSpec {
620 name: "instance-a".to_string(),
621 load_spec: LoadSpec {
622 libpath: "/tmp/fake-lib.so".to_string(),
623 env_tag: Some("env-a".to_string()),
624 flash: Vec::new(),
625 },
626 };
627 let missing_exe = home.path().join("missing-bootstrap-binary");
628 let err = bootstrap::bootstrap_instance_detached_with_exe(&instance, &missing_exe)
629 .await
630 .expect_err("missing bootstrap binary should fail");
631 assert!(
632 err.contains("failed to bootstrap instance 'instance-a'"),
633 "unexpected error: {err}"
634 );
635
636 let bootstrap_dir = crate::daemon::lifecycle::bootstrap_dir();
637 let entries = std::fs::read_dir(&bootstrap_dir)
638 .expect("bootstrap dir should exist")
639 .collect::<Result<Vec<_>, _>>()
640 .expect("bootstrap dir should be readable");
641 assert!(
642 entries.is_empty(),
643 "temp load specs should be cleaned up on spawn failure"
644 );
645
646 restore_agent_sim_home(original_home);
647 }
648}