Skip to main content

agent_sim/daemon/
server.rs

1#[path = "server/action_router.rs"]
2mod action_router;
3#[path = "server/can_ops.rs"]
4mod can_ops;
5#[path = "server/shared_ops.rs"]
6mod shared_ops;
7#[path = "server/tick_ops.rs"]
8mod tick_ops;
9
10use crate::can::CanSocket;
11use crate::can::dbc::DbcBusOverlay;
12use crate::protocol::{Request, RequestAction, Response};
13use crate::shared::SharedRegion;
14use crate::sim::error::SimError;
15use crate::sim::project::Project;
16use crate::sim::time::TimeEngine;
17use crate::sim::types::{SignalType, SignalValue, SimCanBusDesc, SimCanFrame, SimSharedDesc};
18use globset::{Glob, GlobMatcher};
19use std::collections::{BTreeSet, HashMap};
20use std::path::PathBuf;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::{UnixListener, UnixStream};
23use tokio::sync::{mpsc, oneshot, watch};
24
25pub struct DaemonState {
26    session: String,
27    socket_path: PathBuf,
28    env: Option<String>,
29    project: Project,
30    can_attached: HashMap<String, AttachedCanBus>,
31    dbc_overlays: HashMap<String, DbcBusOverlay>,
32    shared_attached: HashMap<String, AttachedSharedChannel>,
33    frame_state: HashMap<String, HashMap<u32, SimCanFrame>>,
34    time: TimeEngine,
35    shutdown: bool,
36}
37
38struct AttachedCanBus {
39    meta: SimCanBusDesc,
40    socket: CanSocket,
41}
42
43struct AttachedSharedChannel {
44    meta: SimSharedDesc,
45    region: SharedRegion,
46    writer: bool,
47}
48
49struct ActionMessage {
50    request: Request,
51    response_tx: oneshot::Sender<Response>,
52}
53
54impl DaemonState {
55    pub fn new(
56        session: String,
57        socket_path: PathBuf,
58        project: Project,
59        env: Option<String>,
60    ) -> Self {
61        Self {
62            session,
63            socket_path,
64            env,
65            project,
66            can_attached: HashMap::new(),
67            dbc_overlays: HashMap::new(),
68            shared_attached: HashMap::new(),
69            frame_state: HashMap::new(),
70            time: TimeEngine::default(),
71            shutdown: false,
72        }
73    }
74
75    fn parse_value(signal_type: SignalType, raw: &str) -> Result<SignalValue, SimError> {
76        match signal_type {
77            SignalType::Bool => match raw {
78                "true" | "1" | "True" | "TRUE" => Ok(SignalValue::Bool(true)),
79                "false" | "0" | "False" | "FALSE" => Ok(SignalValue::Bool(false)),
80                _ => Err(SimError::InvalidArg(format!("invalid bool value '{raw}'"))),
81            },
82            SignalType::U32 => raw
83                .parse::<u32>()
84                .map(SignalValue::U32)
85                .map_err(|_| SimError::InvalidArg(format!("invalid u32 value '{raw}'"))),
86            SignalType::I32 => raw
87                .parse::<i32>()
88                .map(SignalValue::I32)
89                .map_err(|_| SimError::InvalidArg(format!("invalid i32 value '{raw}'"))),
90            SignalType::F32 => raw
91                .parse::<f32>()
92                .map(SignalValue::F32)
93                .map_err(|_| SimError::InvalidArg(format!("invalid f32 value '{raw}'"))),
94            SignalType::F64 => raw
95                .parse::<f64>()
96                .map(SignalValue::F64)
97                .map_err(|_| SimError::InvalidArg(format!("invalid f64 value '{raw}'"))),
98        }
99    }
100
101    fn select_signal_ids(
102        project: &Project,
103        selectors: &[String],
104    ) -> Result<Vec<u32>, Box<dyn std::error::Error + Send + Sync>> {
105        if selectors.is_empty() {
106            return Err("missing signal selectors".into());
107        }
108        let mut ids = BTreeSet::new();
109        for selector in selectors {
110            if selector == "*" {
111                ids.extend(project.signals().iter().map(|s| s.id));
112                continue;
113            }
114            if let Some(raw_id) = selector.strip_prefix('#') {
115                let id = raw_id.parse::<u32>()?;
116                if project.signal_by_id(id).is_none() {
117                    return Err(format!("signal not found: '#{id}'").into());
118                }
119                ids.insert(id);
120                continue;
121            }
122            if selector.contains('*') || selector.contains('?') || selector.contains('[') {
123                let matcher = compile_glob(selector)?;
124                let mut matched = false;
125                for signal in project.signals() {
126                    if matcher.is_match(&signal.name) {
127                        ids.insert(signal.id);
128                        matched = true;
129                    }
130                }
131                if !matched {
132                    return Err(format!("signal glob matched nothing: '{selector}'").into());
133                }
134                continue;
135            }
136
137            if let Some(id) = project.signal_id_by_name(selector) {
138                ids.insert(id);
139            } else {
140                return Err(format!("signal not found: '{selector}'").into());
141            }
142        }
143        Ok(ids.into_iter().collect())
144    }
145}
146
147fn compile_glob(pattern: &str) -> Result<GlobMatcher, Box<dyn std::error::Error + Send + Sync>> {
148    Ok(Glob::new(pattern)?.compile_matcher())
149}
150
151pub async fn run_listener(
152    session: String,
153    socket_path: PathBuf,
154    project: Project,
155    env: Option<String>,
156) -> Result<(), std::io::Error> {
157    if socket_path.exists() {
158        let _ = std::fs::remove_file(&socket_path);
159    }
160    if let Some(parent) = socket_path.parent() {
161        std::fs::create_dir_all(parent)?;
162    }
163    let listener = UnixListener::bind(&socket_path)?;
164    let pid_path = crate::daemon::lifecycle::pid_path(&session);
165    std::fs::write(&pid_path, std::process::id().to_string())?;
166    crate::daemon::lifecycle::write_env_tag(&session, env.as_deref())
167        .map_err(std::io::Error::other)?;
168
169    let state = DaemonState::new(session.clone(), socket_path.clone(), project, env);
170    let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
171    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
172
173    let tick_task = tokio::spawn(tick_ops::run_tick_task(state, action_rx, shutdown_tx));
174    let mut listener_error = None;
175
176    loop {
177        tokio::select! {
178            changed = shutdown_rx.changed() => {
179                match changed {
180                    Ok(()) if *shutdown_rx.borrow() => break,
181                    Ok(()) => {}
182                    Err(_) => break,
183                }
184            }
185            accepted = listener.accept() => {
186                match accepted {
187                    Ok((stream, _addr)) => {
188                        let action_tx = action_tx.clone();
189                        tokio::spawn(async move {
190                            let _ = handle_connection(stream, action_tx).await;
191                        });
192                    }
193                    Err(e) => {
194                        listener_error = Some(e);
195                        break;
196                    }
197                }
198            }
199        }
200    }
201
202    drop(action_tx);
203    let _ = tick_task.await;
204
205    if socket_path.exists() {
206        let _ = std::fs::remove_file(&socket_path);
207    }
208    if pid_path.exists() {
209        let _ = std::fs::remove_file(pid_path);
210    }
211    crate::daemon::lifecycle::remove_env_tag(&session);
212
213    if let Some(err) = listener_error {
214        return Err(err);
215    }
216    Ok(())
217}
218
219async fn handle_connection(
220    stream: UnixStream,
221    action_tx: mpsc::Sender<ActionMessage>,
222) -> Result<(), std::io::Error> {
223    let (read_half, mut write_half) = stream.into_split();
224    let mut reader = BufReader::new(read_half);
225    let mut line = String::new();
226
227    loop {
228        line.clear();
229        let read = reader.read_line(&mut line).await?;
230        if read == 0 {
231            return Ok(());
232        }
233        let response = match serde_json::from_str::<Request>(line.trim_end()) {
234            Ok(request) => {
235                let request_id = request.id;
236                let (response_tx, response_rx) = oneshot::channel();
237                if action_tx
238                    .send(ActionMessage {
239                        request,
240                        response_tx,
241                    })
242                    .await
243                    .is_err()
244                {
245                    Response::err(request_id, "daemon unavailable")
246                } else {
247                    match response_rx.await {
248                        Ok(response) => response,
249                        Err(_) => Response::err(request_id, "daemon unavailable"),
250                    }
251                }
252            }
253            Err(e) => Response {
254                id: uuid::Uuid::new_v4(),
255                success: false,
256                data: None,
257                error: Some(format!("invalid request json: {e}")),
258            },
259        };
260        let mut payload = serde_json::to_string(&response).unwrap_or_else(|e| {
261            format!("{{\"success\":false,\"error\":\"response serialization failed: {e}\"}}")
262        });
263        payload.push('\n');
264        write_half.write_all(payload.as_bytes()).await?;
265    }
266}
267
268async fn process_action_message(message: ActionMessage, state: &mut DaemonState) {
269    let response = handle_action(message.request, state).await;
270    let _ = message.response_tx.send(response);
271}
272
273async fn handle_action(request: Request, state: &mut DaemonState) -> Response {
274    let id = request.id;
275    let result = match request.action {
276        RequestAction::Instance(action) => {
277            action_router::dispatch_instance_action(action, state).await
278        }
279        RequestAction::Worker(action) => action_router::dispatch_worker_action(action, state).await,
280        RequestAction::Env(_) => Err("env-owned action sent to instance daemon".to_string()),
281    };
282
283    match result {
284        Ok(data) => Response::ok(id, data),
285        Err(e) => Response::err(id, e),
286    }
287}