Skip to main content

agent_can/daemon/
server.rs

1use crate::can::{CanSocket, dbc};
2use crate::daemon::config::DaemonConfig;
3use crate::ipc::{self, BoxedLocalStream};
4use crate::protocol::{
5    MailboxMessageData, MailboxSignalData, Request, RequestAction, Response, ResponseData,
6};
7use crate::sim::types::SimCanFrame;
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, split};
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio::task::yield_now;
11use tokio::time::{Duration, sleep};
12
13const MAX_ACTIONS_PER_TURN: usize = 16;
14const POLL_INTERVAL: Duration = Duration::from_millis(10);
15
16struct DaemonState {
17    config: DaemonConfig,
18    socket: CanSocket,
19    dbc: dbc::DbcBusOverlay,
20    mailboxes: std::collections::BTreeMap<String, MailboxMessageData>,
21    rx_seen: u64,
22    rx_decoded: u64,
23    rx_dropped: u64,
24    update_seq: u64,
25    shutdown: bool,
26}
27
28struct ActionMessage {
29    request: Request,
30    response_tx: oneshot::Sender<Response>,
31}
32
33impl DaemonState {
34    fn status(&self) -> ResponseData {
35        ResponseData::Status {
36            bus: self.config.bus.clone(),
37            adapter: self.config.adapter.clone(),
38            dbc_path: self.config.dbc_path.clone(),
39            bitrate: self.config.bitrate,
40            bitrate_data: self.config.bitrate_data,
41            fd_capable: self.config.fd_capable,
42            mailbox_count: self.mailboxes.len(),
43            rx_seen: self.rx_seen,
44            rx_decoded: self.rx_decoded,
45            rx_dropped: self.rx_dropped,
46        }
47    }
48}
49
50pub async fn run_listener(
51    socket_path: std::path::PathBuf,
52    config: DaemonConfig,
53) -> Result<(), std::io::Error> {
54    if let Some(parent) = socket_path.parent() {
55        std::fs::create_dir_all(parent)?;
56    }
57
58    let socket = CanSocket::open(
59        &config.adapter,
60        config.bitrate,
61        config.bitrate_data,
62        config.fd_capable,
63    )
64    .map_err(std::io::Error::other)?;
65    let dbc = dbc::DbcBusOverlay::load(std::path::Path::new(&config.dbc_path))
66        .map_err(std::io::Error::other)?;
67    let mut listener = ipc::bind_listener(&socket_path).await?;
68    ipc::create_endpoint_marker(&socket_path)?;
69    std::fs::write(
70        crate::daemon::lifecycle::pid_path(&config.bus),
71        std::process::id().to_string(),
72    )?;
73
74    let state = DaemonState {
75        config,
76        socket,
77        dbc,
78        mailboxes: std::collections::BTreeMap::new(),
79        rx_seen: 0,
80        rx_decoded: 0,
81        rx_dropped: 0,
82        update_seq: 0,
83        shutdown: false,
84    };
85
86    let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
87    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
88    let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
89
90    let mut listener_error = None;
91    loop {
92        tokio::select! {
93            changed = shutdown_rx.changed() => {
94                match changed {
95                    Ok(()) if *shutdown_rx.borrow() => break,
96                    Ok(()) => {}
97                    Err(_) => break,
98                }
99            }
100            accepted = listener.accept() => {
101                match accepted {
102                    Ok(stream) => {
103                        let action_tx = action_tx.clone();
104                        tokio::spawn(async move {
105                            let _ = handle_connection(stream, action_tx).await;
106                        });
107                    }
108                    Err(err) => {
109                        listener_error = Some(err);
110                        break;
111                    }
112                }
113            }
114        }
115    }
116
117    drop(action_tx);
118    let _ = actor_task.await;
119    ipc::cleanup_endpoint(&socket_path);
120    let pid_path = crate::daemon::lifecycle::pid_path(
121        socket_path
122            .file_stem()
123            .and_then(|value| value.to_str())
124            .unwrap_or("default"),
125    );
126    if pid_path.exists() {
127        let _ = std::fs::remove_file(pid_path);
128    }
129
130    if let Some(err) = listener_error {
131        return Err(err);
132    }
133    Ok(())
134}
135
136async fn handle_connection(
137    stream: BoxedLocalStream,
138    action_tx: mpsc::Sender<ActionMessage>,
139) -> Result<(), std::io::Error> {
140    let (read_half, mut write_half) = split(stream);
141    let mut reader = BufReader::new(read_half);
142    let mut line = String::new();
143
144    loop {
145        line.clear();
146        let read = reader.read_line(&mut line).await?;
147        if read == 0 {
148            return Ok(());
149        }
150        let response = match serde_json::from_str::<Request>(line.trim_end()) {
151            Ok(request) => {
152                let request_id = request.id;
153                let (response_tx, response_rx) = oneshot::channel();
154                if action_tx
155                    .send(ActionMessage {
156                        request,
157                        response_tx,
158                    })
159                    .await
160                    .is_err()
161                {
162                    Response::err(request_id, "daemon unavailable")
163                } else {
164                    match response_rx.await {
165                        Ok(response) => response,
166                        Err(_) => Response::err(request_id, "daemon unavailable"),
167                    }
168                }
169            }
170            Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
171        };
172        let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
173            format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
174        });
175        payload.push('\n');
176        write_half.write_all(payload.as_bytes()).await?;
177    }
178}
179
180async fn run_actor_task(
181    mut state: DaemonState,
182    mut action_rx: mpsc::Receiver<ActionMessage>,
183    shutdown_tx: watch::Sender<bool>,
184) {
185    loop {
186        process_action_batch(&mut action_rx, &mut state, MAX_ACTIONS_PER_TURN).await;
187
188        if state.shutdown {
189            let _ = shutdown_tx.send(true);
190            break;
191        }
192
193        if let Err(err) = poll_can(&mut state) {
194            tracing::error!("CAN poll failed for bus '{}': {err}", state.config.bus);
195            state.shutdown = true;
196            let _ = shutdown_tx.send(true);
197            break;
198        }
199
200        yield_now().await;
201        sleep(POLL_INTERVAL).await;
202    }
203}
204
205async fn process_action_batch(
206    action_rx: &mut mpsc::Receiver<ActionMessage>,
207    state: &mut DaemonState,
208    limit: usize,
209) {
210    for _ in 0..limit {
211        let Some(message) = action_rx.try_recv().ok() else {
212            break;
213        };
214        let response = handle_action(message.request, state);
215        let _ = message.response_tx.send(response);
216    }
217}
218
219fn handle_action(request: Request, state: &mut DaemonState) -> Response {
220    let id = request.id;
221    let result = match request.action {
222        RequestAction::Status => Ok(state.status()),
223        RequestAction::Mailboxes => Ok(ResponseData::Mailboxes {
224            messages: state.mailboxes.values().cloned().collect(),
225        }),
226        RequestAction::Mailbox { message } => state
227            .mailboxes
228            .get(&message)
229            .cloned()
230            .map(|message| ResponseData::Mailbox { message })
231            .ok_or_else(|| format!("message '{message}' has no decoded mailbox value")),
232        RequestAction::SendRaw {
233            arb_id,
234            data_hex,
235            flags,
236        } => {
237            let payload = crate::can::parse_data_hex(&data_hex);
238            payload.and_then(|payload| {
239                let mut data = [0_u8; 64];
240                data[..payload.len()].copy_from_slice(&payload);
241                let frame = SimCanFrame {
242                    arb_id,
243                    len: payload.len() as u8,
244                    flags: flags.unwrap_or(0),
245                    data,
246                };
247                crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
248                state.socket.send(&frame)?;
249                Ok(ResponseData::SentRaw {
250                    arb_id,
251                    len: frame.len,
252                })
253            })
254        }
255        RequestAction::SendMessage { message, signals } => state
256            .dbc
257            .encode_message(&message, &signals)
258            .and_then(|frame| {
259                crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
260                state.socket.send(&frame)?;
261                Ok(ResponseData::SentMessage {
262                    message,
263                    arb_id: frame.arb_id,
264                    len: frame.len,
265                })
266            }),
267        RequestAction::Close => {
268            state.shutdown = true;
269            Ok(ResponseData::Ack)
270        }
271    };
272
273    match result {
274        Ok(data) => Response::ok(id, data),
275        Err(err) => Response::err(id, err),
276    }
277}
278
279fn poll_can(state: &mut DaemonState) -> Result<(), String> {
280    let frames = state.socket.recv_all()?;
281    for frame in frames {
282        state.rx_seen += 1;
283        crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
284        let Some(decoded) = state.dbc.decode_message(&frame)? else {
285            state.rx_dropped += 1;
286            continue;
287        };
288        state.rx_decoded += 1;
289        state.update_seq += 1;
290        let message = MailboxMessageData {
291            message: decoded.name.clone(),
292            arb_id: frame.arb_id,
293            extended: decoded.extended,
294            len: frame.len,
295            update_seq: state.update_seq,
296            signals: decoded
297                .signals
298                .into_iter()
299                .map(|signal| MailboxSignalData {
300                    name: signal.name,
301                    value: signal.value,
302                    unit: signal.unit,
303                })
304                .collect(),
305        };
306        state.mailboxes.insert(decoded.name, message);
307    }
308    Ok(())
309}