Skip to main content

agent_sim/envd/
server.rs

1#[path = "server/bootstrap.rs"]
2mod bootstrap;
3#[path = "server/dispatch.rs"]
4mod dispatch;
5#[path = "server/instance_worker.rs"]
6mod instance_worker;
7#[path = "server/tick.rs"]
8mod tick;
9
10use crate::can::CanSocket;
11use crate::can::dbc::{DbcBusOverlay, frame_key_from_frame};
12use crate::daemon::lifecycle::{kill_pid, read_pid};
13use crate::envd::lifecycle::pid_path;
14use crate::envd::spec::EnvSpec;
15use crate::protocol::{
16    CanFrameData, InstanceAction, Request, RequestAction, Response, parse_duration_us,
17};
18use crate::sim::time::TimeEngine;
19#[cfg(test)]
20use crate::sim::types::CAN_FLAG_EXTENDED;
21use crate::sim::types::SimCanFrame;
22use std::collections::{BTreeMap, HashMap};
23use std::path::PathBuf;
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::net::{UnixListener, UnixStream};
26use tokio::sync::{mpsc, oneshot, watch};
27use tokio::time::timeout;
28
29struct EnvState {
30    name: String,
31    socket_path: PathBuf,
32    tick_duration_us: u32,
33    instances: Vec<String>,
34    instance_workers: HashMap<String, instance_worker::InstanceWorker>,
35    time: TimeEngine,
36    can_buses: BTreeMap<String, EnvCanBusState>,
37    shutdown: bool,
38}
39
40struct EnvCanBusState {
41    name: String,
42    vcan_iface: String,
43    fd_capable: bool,
44    bitrate: u32,
45    bitrate_data: u32,
46    socket: CanSocket,
47    dbc: Option<DbcBusOverlay>,
48    latest_frames: HashMap<u32, SimCanFrame>,
49    schedules: BTreeMap<String, CanScheduleJob>,
50}
51
52#[derive(Clone)]
53struct CanScheduleJob {
54    job_id: String,
55    arb_id: u32,
56    flags: u8,
57    data_hex: String,
58    frame: SimCanFrame,
59    every_ticks: u64,
60    next_due_tick: u64,
61    enabled: bool,
62}
63
64struct ActionMessage {
65    request: Request,
66    response_tx: oneshot::Sender<Response>,
67}
68
69pub async fn run_listener(socket_path: PathBuf, env_spec: EnvSpec) -> Result<(), std::io::Error> {
70    if socket_path.exists() {
71        let _ = std::fs::remove_file(&socket_path);
72    }
73    if let Some(parent) = socket_path.parent() {
74        std::fs::create_dir_all(parent)?;
75    }
76
77    let state = EnvState::bootstrap(socket_path.clone(), env_spec)
78        .await
79        .map_err(std::io::Error::other)?;
80    let listener = match UnixListener::bind(&socket_path) {
81        Ok(listener) => listener,
82        Err(err) => {
83            cleanup_listener_runtime(&state).await;
84            return Err(err);
85        }
86    };
87    if let Err(err) = std::fs::write(pid_path(&state.name), std::process::id().to_string()) {
88        cleanup_listener_runtime(&state).await;
89        return Err(err);
90    }
91
92    let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
93    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
94    let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
95
96    let mut listener_error = None;
97    loop {
98        tokio::select! {
99            changed = shutdown_rx.changed() => {
100                match changed {
101                    Ok(()) if *shutdown_rx.borrow() => break,
102                    Ok(()) => {}
103                    Err(_) => break,
104                }
105            }
106            accepted = listener.accept() => {
107                match accepted {
108                    Ok((stream, _)) => {
109                        let action_tx = action_tx.clone();
110                        tokio::spawn(async move {
111                            let _ = handle_connection(stream, action_tx).await;
112                        });
113                    }
114                    Err(err) => {
115                        listener_error = Some(err);
116                        break;
117                    }
118                }
119            }
120        }
121    }
122
123    drop(action_tx);
124    let state = actor_task
125        .await
126        .map_err(|err| std::io::Error::other(err.to_string()))?;
127    cleanup_listener_runtime(&state).await;
128    if let Some(err) = listener_error {
129        return Err(err);
130    }
131    Ok(())
132}
133
134async fn handle_connection(
135    stream: UnixStream,
136    action_tx: mpsc::Sender<ActionMessage>,
137) -> Result<(), std::io::Error> {
138    let (read_half, mut write_half) = stream.into_split();
139    let mut reader = BufReader::new(read_half);
140    let mut line = String::new();
141
142    loop {
143        line.clear();
144        let read = reader.read_line(&mut line).await?;
145        if read == 0 {
146            return Ok(());
147        }
148        let response = match serde_json::from_str::<Request>(line.trim_end()) {
149            Ok(request) => {
150                let request_id = request.id;
151                let (response_tx, response_rx) = oneshot::channel();
152                if action_tx
153                    .send(ActionMessage {
154                        request,
155                        response_tx,
156                    })
157                    .await
158                    .is_err()
159                {
160                    Response::err(request_id, "env daemon unavailable")
161                } else {
162                    match response_rx.await {
163                        Ok(response) => response,
164                        Err(_) => Response::err(request_id, "env daemon unavailable"),
165                    }
166                }
167            }
168            Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
169        };
170        let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
171            format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
172        });
173        payload.push('\n');
174        write_half.write_all(payload.as_bytes()).await?;
175    }
176}
177
178async fn process_action_message(message: ActionMessage, state: &mut EnvState) {
179    let response = handle_action(message.request, state).await;
180    let _ = message.response_tx.send(response);
181}
182
183async fn handle_action(request: Request, state: &mut EnvState) -> Response {
184    let id = request.id;
185    let result = match request.action {
186        RequestAction::Env(action) => dispatch::dispatch_action(action, state).await,
187        RequestAction::Instance(_) | RequestAction::Worker(_) => {
188            Err("instance-owned action sent to env daemon".to_string())
189        }
190    };
191
192    match result {
193        Ok(data) => Response::ok(id, data),
194        Err(err) => Response::err(id, err),
195    }
196}
197
198async fn run_actor_task(
199    mut state: EnvState,
200    mut action_rx: mpsc::Receiver<ActionMessage>,
201    shutdown_tx: watch::Sender<bool>,
202) -> EnvState {
203    loop {
204        while let Ok(message) = action_rx.try_recv() {
205            process_action_message(message, &mut state).await;
206        }
207
208        if state.shutdown {
209            break;
210        }
211
212        let due_ticks = state.time.tick_realtime_due(state.tick_duration_us);
213        if let Err(err) = tick::advance_due_ticks(&mut state, due_ticks).await {
214            tracing::error!("env '{}' tick loop failed: {err}", state.name);
215            state.shutdown = true;
216        }
217
218        if state.shutdown {
219            break;
220        }
221
222        let sleep_duration = state.time.realtime_poll_delay(state.tick_duration_us);
223        match timeout(sleep_duration, action_rx.recv()).await {
224            Ok(Some(message)) => process_action_message(message, &mut state).await,
225            Ok(None) => break,
226            Err(_) => {}
227        }
228    }
229
230    let _ = shutdown_tx.send(true);
231    state
232}
233
234fn duration_to_env_ticks(tick_duration_us: u32, raw: &str) -> Result<u64, String> {
235    let duration_us = parse_duration_us(raw).map_err(|err| err.to_string())?;
236    if duration_us == 0 {
237        return Err("schedule period must be greater than zero".to_string());
238    }
239    let tick = u64::from(tick_duration_us.max(1));
240    Ok(duration_us.div_ceil(tick))
241}
242
243fn reset_env_can_state(state: &mut EnvState) {
244    for bus in state.can_buses.values_mut() {
245        let _ = bus.socket.recv_all();
246        bus.latest_frames.clear();
247        for schedule in bus.schedules.values_mut() {
248            schedule.next_due_tick = 0;
249        }
250    }
251}
252
253fn parse_env_frame(
254    state: &EnvState,
255    bus_name: &str,
256    arb_id: u32,
257    data_hex: &str,
258    flags: u8,
259) -> Result<SimCanFrame, String> {
260    let payload = crate::can::parse_data_hex(data_hex)?;
261    let mut data = [0_u8; 64];
262    data[..payload.len()].copy_from_slice(&payload);
263    let frame = SimCanFrame {
264        arb_id,
265        len: payload.len() as u8,
266        flags,
267        data,
268    };
269    validate_env_frame(state, bus_name, &frame)?;
270    Ok(frame)
271}
272
273fn send_env_frame(state: &mut EnvState, bus_name: &str, frame: &SimCanFrame) -> Result<(), String> {
274    validate_env_frame(state, bus_name, frame)?;
275    let bus = state
276        .can_buses
277        .get_mut(bus_name)
278        .ok_or_else(|| format!("env CAN bus '{bus_name}' not found"))?;
279    bus.socket.send(frame)?;
280    record_env_frame(bus, frame);
281    Ok(())
282}
283
284fn observe_env_bus_frames(state: &mut EnvState) -> Result<(), String> {
285    for bus in state.can_buses.values_mut() {
286        for frame in bus.socket.recv_all()? {
287            record_env_frame(bus, &frame);
288        }
289    }
290    Ok(())
291}
292
293fn record_env_frame(bus: &mut EnvCanBusState, frame: &SimCanFrame) {
294    bus.latest_frames
295        .insert(frame_key_from_frame(frame), frame.clone());
296}
297
298fn validate_env_frame(state: &EnvState, bus_name: &str, frame: &SimCanFrame) -> Result<(), String> {
299    let bus = state
300        .can_buses
301        .get(bus_name)
302        .ok_or_else(|| format!("env CAN bus '{bus_name}' not found"))?;
303    crate::can::validate_frame(&bus.name, bus.fd_capable, frame)
304}
305
306fn locate_schedule_mut<'a>(
307    state: &'a mut EnvState,
308    job_id: &str,
309) -> Result<(String, &'a mut CanScheduleJob), String> {
310    for (bus_name, bus) in &mut state.can_buses {
311        if let Some(schedule) = bus.schedules.get_mut(job_id) {
312            return Ok((bus_name.clone(), schedule));
313        }
314    }
315    Err(format!("CAN schedule '{job_id}' not found"))
316}
317
318fn locate_schedule_bus(state: &EnvState, job_id: &str) -> Result<String, String> {
319    state
320        .can_buses
321        .iter()
322        .find(|(_, bus)| bus.schedules.contains_key(job_id))
323        .map(|(bus_name, _)| bus_name.clone())
324        .ok_or_else(|| format!("CAN schedule '{job_id}' not found"))
325}
326
327async fn cleanup_listener_runtime(state: &EnvState) {
328    shutdown_instances(state).await;
329    if state.socket_path.exists() {
330        let _ = std::fs::remove_file(&state.socket_path);
331    }
332    let pid = pid_path(&state.name);
333    if pid.exists() {
334        let _ = std::fs::remove_file(pid);
335    }
336}
337
338fn ensure_unique_schedule_job_id<'a, I>(schedules: I, job_id: &str) -> Result<(), String>
339where
340    I: IntoIterator<Item = &'a BTreeMap<String, CanScheduleJob>>,
341{
342    if schedules
343        .into_iter()
344        .any(|schedule_map| schedule_map.contains_key(job_id))
345    {
346        return Err(format!("CAN schedule '{job_id}' already exists"));
347    }
348    Ok(())
349}
350
351fn frame_data(frame: &SimCanFrame) -> CanFrameData {
352    CanFrameData {
353        arb_id: frame.arb_id,
354        len: frame.len,
355        flags: frame.flags,
356        data_hex: frame
357            .payload()
358            .iter()
359            .map(|byte| format!("{byte:02X}"))
360            .collect::<Vec<_>>()
361            .join(""),
362    }
363}
364
365fn update_schedule(
366    schedule: &mut CanScheduleJob,
367    arb_id: u32,
368    data_hex: String,
369    frame: SimCanFrame,
370    every_ticks: u64,
371    current_tick: u64,
372) {
373    schedule.arb_id = arb_id;
374    schedule.flags = frame.flags;
375    schedule.data_hex = data_hex;
376    schedule.frame = frame;
377    schedule.every_ticks = every_ticks;
378    schedule.next_due_tick = current_tick;
379}
380
381fn start_schedule(schedule: &mut CanScheduleJob) {
382    schedule.enabled = true;
383}
384
385async fn shutdown_instances(state: &EnvState) {
386    let mut pending = Vec::with_capacity(state.instances.len());
387    for instance in &state.instances {
388        if let Some(worker) = state.instance_workers.get(instance)
389            && let Ok(response_rx) = worker.begin_instance_request(InstanceAction::Close).await
390        {
391            pending.push((instance.clone(), response_rx));
392            continue;
393        }
394        if let Some(pid) = read_pid(instance) {
395            let _ = kill_pid(pid);
396        }
397    }
398
399    for (instance, response_rx) in pending {
400        if response_rx.await.ok().and_then(Result::ok).is_none()
401            && let Some(pid) = read_pid(&instance)
402        {
403            let _ = kill_pid(pid);
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::envd::spec::EnvInstanceSpec;
412    use crate::load::LoadSpec;
413    use crate::protocol::{ResponseData, WorkerAction};
414    use serial_test::serial;
415    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
416    use tokio::net::UnixListener;
417
418    fn frame(arb_id: u32, flags: u8, data: &[u8]) -> SimCanFrame {
419        let mut payload = [0_u8; 64];
420        payload[..data.len()].copy_from_slice(data);
421        SimCanFrame {
422            arb_id,
423            len: data.len() as u8,
424            flags,
425            data: payload,
426        }
427    }
428
429    fn schedule(enabled: bool) -> CanScheduleJob {
430        let original_frame = frame(0x123, 0, &[0xAA, 0xBB]);
431        CanScheduleJob {
432            job_id: "job-1".to_string(),
433            arb_id: original_frame.arb_id,
434            flags: original_frame.flags,
435            data_hex: "AABB".to_string(),
436            frame: original_frame,
437            every_ticks: 10,
438            next_due_tick: 5,
439            enabled,
440        }
441    }
442
443    fn restore_agent_sim_home(original_home: Option<std::ffi::OsString>) {
444        if let Some(value) = original_home {
445            unsafe {
446                std::env::set_var("AGENT_SIM_HOME", value);
447            }
448        } else {
449            unsafe {
450                std::env::remove_var("AGENT_SIM_HOME");
451            }
452        }
453    }
454
455    #[test]
456    fn schedule_update_preserves_disabled_state() {
457        let mut schedule = schedule(false);
458        let updated_frame = frame(0x456, CAN_FLAG_EXTENDED, &[0x01, 0x02, 0x03]);
459
460        update_schedule(
461            &mut schedule,
462            updated_frame.arb_id,
463            "010203".to_string(),
464            updated_frame,
465            42,
466            17,
467        );
468
469        assert_eq!(schedule.arb_id, 0x456);
470        assert_eq!(schedule.flags, CAN_FLAG_EXTENDED);
471        assert_eq!(schedule.data_hex, "010203");
472        assert_eq!(schedule.every_ticks, 42);
473        assert_eq!(schedule.next_due_tick, 17);
474        assert!(!schedule.enabled);
475        assert_eq!(schedule.frame.len, 3);
476        assert_eq!(schedule.frame.payload(), &[0x01, 0x02, 0x03]);
477    }
478
479    #[test]
480    fn schedule_update_preserves_enabled_state() {
481        let mut schedule = schedule(true);
482        let updated_frame = frame(0x456, CAN_FLAG_EXTENDED, &[0x01, 0x02, 0x03]);
483
484        update_schedule(
485            &mut schedule,
486            updated_frame.arb_id,
487            "010203".to_string(),
488            updated_frame,
489            42,
490            23,
491        );
492
493        assert!(schedule.enabled);
494        assert_eq!(schedule.next_due_tick, 23);
495        assert_eq!(schedule.frame.payload(), &[0x01, 0x02, 0x03]);
496    }
497
498    #[test]
499    fn start_schedule_reenables_stopped_schedule() {
500        let mut schedule = schedule(false);
501        start_schedule(&mut schedule);
502
503        assert!(schedule.enabled);
504    }
505
506    #[test]
507    fn schedule_job_ids_must_be_unique_across_buses() {
508        let mut bus_a = BTreeMap::new();
509        let bus_b = BTreeMap::new();
510        bus_a.insert("job-1".to_string(), schedule(true));
511
512        let err = ensure_unique_schedule_job_id([&bus_a, &bus_b], "job-1").unwrap_err();
513
514        assert_eq!(err, "CAN schedule 'job-1' already exists");
515    }
516
517    #[test]
518    fn schedule_job_id_check_allows_new_ids() {
519        let mut bus_a = BTreeMap::new();
520        let bus_b = BTreeMap::new();
521        bus_a.insert("job-1".to_string(), schedule(true));
522
523        let result = ensure_unique_schedule_job_id([&bus_a, &bus_b], "job-2");
524
525        assert!(result.is_ok());
526    }
527
528    #[test]
529    fn schedule_period_rounds_up_to_avoid_running_faster_than_requested() {
530        assert_eq!(
531            duration_to_env_ticks(20, "30us").expect("schedule period should convert"),
532            2
533        );
534        assert_eq!(
535            duration_to_env_ticks(20, "40us").expect("schedule period should convert"),
536            2
537        );
538    }
539
540    #[tokio::test(flavor = "current_thread")]
541    #[serial]
542    async fn advance_single_tick_issues_direct_worker_step() {
543        let home = tempfile::tempdir().expect("temp AGENT_SIM_HOME should be creatable");
544        let original_home = std::env::var_os("AGENT_SIM_HOME");
545        unsafe {
546            std::env::set_var("AGENT_SIM_HOME", home.path());
547        }
548
549        let instance = "instance-a";
550        let socket_path = crate::daemon::lifecycle::socket_path(instance);
551        std::fs::create_dir_all(
552            socket_path
553                .parent()
554                .expect("instance socket should have a parent directory"),
555        )
556        .expect("instance socket parent should be creatable");
557        let listener =
558            UnixListener::bind(&socket_path).expect("fake instance listener should bind");
559        let server = tokio::spawn(async move {
560            let (mut stream, _) = listener
561                .accept()
562                .await
563                .expect("fake instance should accept worker-step request");
564            let mut line = String::new();
565            let mut reader = BufReader::new(&mut stream);
566            reader
567                .read_line(&mut line)
568                .await
569                .expect("request should be readable");
570            drop(reader);
571            let request: Request =
572                serde_json::from_str(line.trim_end()).expect("request json should parse");
573            assert!(matches!(
574                request.action,
575                RequestAction::Worker(WorkerAction::Step)
576            ));
577            let response = Response::ok(request.id, ResponseData::Ack);
578            let mut payload = serde_json::to_string(&response).expect("response should serialize");
579            payload.push('\n');
580            stream
581                .write_all(payload.as_bytes())
582                .await
583                .expect("response should be writable");
584        });
585        let worker = instance_worker::InstanceWorker::connect(instance)
586            .await
587            .expect("test worker should connect to fake instance");
588
589        let mut state = EnvState {
590            name: "env".to_string(),
591            socket_path: PathBuf::new(),
592            tick_duration_us: 20,
593            instances: vec![instance.to_string()],
594            instance_workers: HashMap::from([(instance.to_string(), worker)]),
595            time: TimeEngine::default(),
596            can_buses: BTreeMap::new(),
597            shutdown: false,
598        };
599
600        tick::advance_single_tick(&mut state)
601            .await
602            .expect("worker step should succeed");
603        server.await.expect("fake instance task should finish");
604
605        assert_eq!(state.time.status(state.tick_duration_us).elapsed_ticks, 1);
606
607        restore_agent_sim_home(original_home);
608    }
609
610    #[tokio::test(flavor = "current_thread")]
611    #[serial]
612    async fn bootstrap_instance_detached_removes_temp_file_when_spawn_fails() {
613        let home = tempfile::tempdir().expect("temp AGENT_SIM_HOME should be creatable");
614        let original_home = std::env::var_os("AGENT_SIM_HOME");
615        unsafe {
616            std::env::set_var("AGENT_SIM_HOME", home.path());
617        }
618
619        let instance = EnvInstanceSpec {
620            name: "instance-a".to_string(),
621            load_spec: LoadSpec {
622                libpath: "/tmp/fake-lib.so".to_string(),
623                env_tag: Some("env-a".to_string()),
624                flash: Vec::new(),
625            },
626        };
627        let missing_exe = home.path().join("missing-bootstrap-binary");
628        let err = bootstrap::bootstrap_instance_detached_with_exe(&instance, &missing_exe)
629            .await
630            .expect_err("missing bootstrap binary should fail");
631        assert!(
632            err.contains("failed to bootstrap instance 'instance-a'"),
633            "unexpected error: {err}"
634        );
635
636        let bootstrap_dir = crate::daemon::lifecycle::bootstrap_dir();
637        let entries = std::fs::read_dir(&bootstrap_dir)
638            .expect("bootstrap dir should exist")
639            .collect::<Result<Vec<_>, _>>()
640            .expect("bootstrap dir should be readable");
641        assert!(
642            entries.is_empty(),
643            "temp load specs should be cleaned up on spawn failure"
644        );
645
646        restore_agent_sim_home(original_home);
647    }
648}