1use crate::can::{CanSocket, dbc};
2use crate::daemon::config::DaemonConfig;
3use crate::ipc::{self, BoxedLocalStream};
4use crate::protocol::{
5 MailboxMessageData, MailboxSignalData, Request, RequestAction, Response, ResponseData,
6};
7use crate::sim::types::SimCanFrame;
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, split};
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio::task::yield_now;
11use tokio::time::{Duration, sleep};
12
13const MAX_ACTIONS_PER_TURN: usize = 16;
14const POLL_INTERVAL: Duration = Duration::from_millis(10);
15
16struct DaemonState {
17 config: DaemonConfig,
18 socket: CanSocket,
19 dbc: dbc::DbcBusOverlay,
20 mailboxes: std::collections::BTreeMap<String, MailboxMessageData>,
21 rx_seen: u64,
22 rx_decoded: u64,
23 rx_dropped: u64,
24 update_seq: u64,
25 shutdown: bool,
26}
27
28struct ActionMessage {
29 request: Request,
30 response_tx: oneshot::Sender<Response>,
31}
32
33impl DaemonState {
34 fn status(&self) -> ResponseData {
35 ResponseData::Status {
36 bus: self.config.bus.clone(),
37 adapter: self.config.adapter.clone(),
38 dbc_path: self.config.dbc_path.clone(),
39 bitrate: self.config.bitrate,
40 bitrate_data: self.config.bitrate_data,
41 fd_capable: self.config.fd_capable,
42 mailbox_count: self.mailboxes.len(),
43 rx_seen: self.rx_seen,
44 rx_decoded: self.rx_decoded,
45 rx_dropped: self.rx_dropped,
46 }
47 }
48}
49
50pub async fn run_listener(
51 socket_path: std::path::PathBuf,
52 config: DaemonConfig,
53) -> Result<(), std::io::Error> {
54 if let Some(parent) = socket_path.parent() {
55 std::fs::create_dir_all(parent)?;
56 }
57
58 let socket = CanSocket::open(
59 &config.adapter,
60 config.bitrate,
61 config.bitrate_data,
62 config.fd_capable,
63 )
64 .map_err(std::io::Error::other)?;
65 let dbc = dbc::DbcBusOverlay::load(std::path::Path::new(&config.dbc_path))
66 .map_err(std::io::Error::other)?;
67 let mut listener = ipc::bind_listener(&socket_path).await?;
68 ipc::create_endpoint_marker(&socket_path)?;
69 std::fs::write(
70 crate::daemon::lifecycle::pid_path(&config.bus),
71 std::process::id().to_string(),
72 )?;
73
74 let state = DaemonState {
75 config,
76 socket,
77 dbc,
78 mailboxes: std::collections::BTreeMap::new(),
79 rx_seen: 0,
80 rx_decoded: 0,
81 rx_dropped: 0,
82 update_seq: 0,
83 shutdown: false,
84 };
85
86 let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
87 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
88 let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
89
90 let mut listener_error = None;
91 loop {
92 tokio::select! {
93 changed = shutdown_rx.changed() => {
94 match changed {
95 Ok(()) if *shutdown_rx.borrow() => break,
96 Ok(()) => {}
97 Err(_) => break,
98 }
99 }
100 accepted = listener.accept() => {
101 match accepted {
102 Ok(stream) => {
103 let action_tx = action_tx.clone();
104 tokio::spawn(async move {
105 let _ = handle_connection(stream, action_tx).await;
106 });
107 }
108 Err(err) => {
109 listener_error = Some(err);
110 break;
111 }
112 }
113 }
114 }
115 }
116
117 drop(action_tx);
118 let _ = actor_task.await;
119 ipc::cleanup_endpoint(&socket_path);
120 let pid_path = crate::daemon::lifecycle::pid_path(
121 socket_path
122 .file_stem()
123 .and_then(|value| value.to_str())
124 .unwrap_or("default"),
125 );
126 if pid_path.exists() {
127 let _ = std::fs::remove_file(pid_path);
128 }
129
130 if let Some(err) = listener_error {
131 return Err(err);
132 }
133 Ok(())
134}
135
136async fn handle_connection(
137 stream: BoxedLocalStream,
138 action_tx: mpsc::Sender<ActionMessage>,
139) -> Result<(), std::io::Error> {
140 let (read_half, mut write_half) = split(stream);
141 let mut reader = BufReader::new(read_half);
142 let mut line = String::new();
143
144 loop {
145 line.clear();
146 let read = reader.read_line(&mut line).await?;
147 if read == 0 {
148 return Ok(());
149 }
150 let response = match serde_json::from_str::<Request>(line.trim_end()) {
151 Ok(request) => {
152 let request_id = request.id;
153 let (response_tx, response_rx) = oneshot::channel();
154 if action_tx
155 .send(ActionMessage {
156 request,
157 response_tx,
158 })
159 .await
160 .is_err()
161 {
162 Response::err(request_id, "daemon unavailable")
163 } else {
164 match response_rx.await {
165 Ok(response) => response,
166 Err(_) => Response::err(request_id, "daemon unavailable"),
167 }
168 }
169 }
170 Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
171 };
172 let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
173 format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
174 });
175 payload.push('\n');
176 write_half.write_all(payload.as_bytes()).await?;
177 }
178}
179
180async fn run_actor_task(
181 mut state: DaemonState,
182 mut action_rx: mpsc::Receiver<ActionMessage>,
183 shutdown_tx: watch::Sender<bool>,
184) {
185 loop {
186 process_action_batch(&mut action_rx, &mut state, MAX_ACTIONS_PER_TURN).await;
187
188 if state.shutdown {
189 let _ = shutdown_tx.send(true);
190 break;
191 }
192
193 if let Err(err) = poll_can(&mut state) {
194 tracing::error!("CAN poll failed for bus '{}': {err}", state.config.bus);
195 state.shutdown = true;
196 let _ = shutdown_tx.send(true);
197 break;
198 }
199
200 yield_now().await;
201 sleep(POLL_INTERVAL).await;
202 }
203}
204
205async fn process_action_batch(
206 action_rx: &mut mpsc::Receiver<ActionMessage>,
207 state: &mut DaemonState,
208 limit: usize,
209) {
210 for _ in 0..limit {
211 let Some(message) = action_rx.try_recv().ok() else {
212 break;
213 };
214 let response = handle_action(message.request, state);
215 let _ = message.response_tx.send(response);
216 }
217}
218
219fn handle_action(request: Request, state: &mut DaemonState) -> Response {
220 let id = request.id;
221 let result = match request.action {
222 RequestAction::Status => Ok(state.status()),
223 RequestAction::Mailboxes => Ok(ResponseData::Mailboxes {
224 messages: state.mailboxes.values().cloned().collect(),
225 }),
226 RequestAction::Mailbox { message } => state
227 .mailboxes
228 .get(&message)
229 .cloned()
230 .map(|message| ResponseData::Mailbox { message })
231 .ok_or_else(|| format!("message '{message}' has no decoded mailbox value")),
232 RequestAction::SendRaw {
233 arb_id,
234 data_hex,
235 flags,
236 } => {
237 let payload = crate::can::parse_data_hex(&data_hex);
238 payload.and_then(|payload| {
239 let mut data = [0_u8; 64];
240 data[..payload.len()].copy_from_slice(&payload);
241 let frame = SimCanFrame {
242 arb_id,
243 len: payload.len() as u8,
244 flags: flags.unwrap_or(0),
245 data,
246 };
247 crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
248 state.socket.send(&frame)?;
249 Ok(ResponseData::SentRaw {
250 arb_id,
251 len: frame.len,
252 })
253 })
254 }
255 RequestAction::SendMessage { message, signals } => state
256 .dbc
257 .encode_message(&message, &signals)
258 .and_then(|frame| {
259 crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
260 state.socket.send(&frame)?;
261 Ok(ResponseData::SentMessage {
262 message,
263 arb_id: frame.arb_id,
264 len: frame.len,
265 })
266 }),
267 RequestAction::Close => {
268 state.shutdown = true;
269 Ok(ResponseData::Ack)
270 }
271 };
272
273 match result {
274 Ok(data) => Response::ok(id, data),
275 Err(err) => Response::err(id, err),
276 }
277}
278
279fn poll_can(state: &mut DaemonState) -> Result<(), String> {
280 let frames = state.socket.recv_all()?;
281 for frame in frames {
282 state.rx_seen += 1;
283 crate::can::validate_frame(&state.config.bus, state.config.fd_capable, &frame)?;
284 let Some(decoded) = state.dbc.decode_message(&frame)? else {
285 state.rx_dropped += 1;
286 continue;
287 };
288 state.rx_decoded += 1;
289 state.update_seq += 1;
290 let message = MailboxMessageData {
291 message: decoded.name.clone(),
292 arb_id: frame.arb_id,
293 extended: decoded.extended,
294 len: frame.len,
295 update_seq: state.update_seq,
296 signals: decoded
297 .signals
298 .into_iter()
299 .map(|signal| MailboxSignalData {
300 name: signal.name,
301 value: signal.value,
302 unit: signal.unit,
303 })
304 .collect(),
305 };
306 state.mailboxes.insert(decoded.name, message);
307 }
308 Ok(())
309}