1use crate::can::dbc::{DbcRegistry, FrameIdentity};
2use crate::can::{CanSocket, validate_frame};
3use crate::daemon::config::DaemonConfig;
4use crate::ipc::{self, BoxedLocalStream};
5use crate::protocol::{
6 ConnectRequest, ConnectResult, EventDirection, LoadedDbc, MessageEntryKind, MessageListEntry,
7 MessageListRequest, MessageObservation, MessageReadRequest, MessageReadResult,
8 MessageSendRequest, MessageSendResult, MessageStopRequest, Request, RequestAction, Response,
9 ResponseData, Selector, SessionStatus, TraceStartRequest, payload_to_hex,
10};
11use crate::trace::TraceWriter;
12use std::collections::{BTreeMap, HashMap, VecDeque};
13use std::path::Path;
14use std::path::PathBuf;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, split};
17use tokio::sync::{mpsc, oneshot, watch};
18
19const MAX_ACTIONS_PER_TURN: usize = 16;
20const POLL_INTERVAL: Duration = Duration::from_millis(10);
21const RETENTION_WINDOW: Duration = Duration::from_secs(60);
22const RETENTION_EVENT_CAP: usize = 4096;
23const MAX_REQUEST_LINE_BYTES: usize = 64 * 1024;
24
25pub trait Backend: Send {
26 fn recv_all(&mut self) -> Result<Vec<crate::frame::CanFrame>, String>;
27 fn send(&mut self, frame: &crate::frame::CanFrame) -> Result<(), String>;
28}
29
30#[derive(Debug)]
31struct HardwareBackend {
32 socket: CanSocket,
33}
34
35impl Backend for HardwareBackend {
36 fn recv_all(&mut self) -> Result<Vec<crate::frame::CanFrame>, String> {
37 self.socket.recv_all()
38 }
39
40 fn send(&mut self, frame: &crate::frame::CanFrame) -> Result<(), String> {
41 self.socket.send(frame)
42 }
43}
44
45#[derive(Debug, Clone)]
46struct ObservedEvent {
47 seq: u64,
48 unix_ms: u128,
49 recorded_at: Instant,
50 direction: EventDirection,
51 frame: crate::frame::CanFrame,
52}
53
54#[derive(Debug, Clone)]
55struct LatestObservation {
56 latest_any: ObservedEvent,
57 latest_rx: Option<ObservedEvent>,
58 latest_tx: Option<ObservedEvent>,
59}
60
61impl LatestObservation {
62 fn visible(&self, include_tx: bool) -> Option<&ObservedEvent> {
63 if include_tx {
64 Some(&self.latest_any)
65 } else {
66 self.latest_rx.as_ref()
67 }
68 }
69
70 fn has_rx(&self) -> bool {
71 self.latest_rx.is_some()
72 }
73
74 fn has_tx(&self) -> bool {
75 self.latest_tx.is_some()
76 }
77}
78
79#[derive(Debug, Clone)]
80struct PeriodicScheduleState {
81 target: String,
82 frame: crate::frame::CanFrame,
83 periodicity_ms: u64,
84 next_due: Instant,
85}
86
87pub struct SessionService<B: Backend> {
88 connect: ConnectRequest,
89 dbcs: DbcRegistry,
90 backend: B,
91 events: VecDeque<ObservedEvent>,
92 latest: HashMap<FrameIdentity, LatestObservation>,
93 schedules: BTreeMap<String, PeriodicScheduleState>,
94 trace: Option<Box<dyn ActiveTrace>>,
95 backend_error: Option<String>,
96 next_seq: u64,
97 shutdown: bool,
98}
99
100struct ActionMessage {
101 request: Request,
102 response_tx: oneshot::Sender<Response>,
103}
104
105trait ActiveTrace: Send {
106 fn path(&self) -> &Path;
107 fn write_event(
108 &mut self,
109 direction: EventDirection,
110 frame: &crate::frame::CanFrame,
111 ) -> Result<(), String>;
112 fn finish(self: Box<Self>) -> Result<PathBuf, String>;
113}
114
115impl ActiveTrace for TraceWriter {
116 fn path(&self) -> &Path {
117 self.path()
118 }
119
120 fn write_event(
121 &mut self,
122 direction: EventDirection,
123 frame: &crate::frame::CanFrame,
124 ) -> Result<(), String> {
125 self.write_event(direction, frame)
126 }
127
128 fn finish(self: Box<Self>) -> Result<PathBuf, String> {
129 TraceWriter::finish(*self)
130 }
131}
132
133impl<B: Backend> SessionService<B> {
134 pub fn new(connect: ConnectRequest, dbcs: DbcRegistry, backend: B) -> Self {
135 Self {
136 connect,
137 dbcs,
138 backend,
139 events: VecDeque::new(),
140 latest: HashMap::new(),
141 schedules: BTreeMap::new(),
142 trace: None,
143 backend_error: None,
144 next_seq: 1,
145 shutdown: false,
146 }
147 }
148
149 pub fn handle_request(&mut self, request: Request) -> Response {
150 let id = request.id;
151 let result: Result<ResponseData, String> = match request.action {
152 RequestAction::AdaptersList => Ok(ResponseData::AdaptersList {
153 adapters: crate::can::available_adapters(),
154 }),
155 RequestAction::Connect(connect) => {
156 self.handle_connect(connect).map(ResponseData::Connected)
157 }
158 RequestAction::Disconnect => self.handle_disconnect(),
159 RequestAction::Status => Ok(ResponseData::Status(self.status())),
160 RequestAction::Schema(schema) => schema
161 .filter
162 .as_deref()
163 .map(Selector::parse)
164 .transpose()
165 .map(|filter| ResponseData::Schema {
166 messages: self.dbcs.schema(filter.as_ref()),
167 }),
168 RequestAction::MessageList(request) => self.handle_message_list(request),
169 RequestAction::MessageRead(request) => self.handle_message_read(request),
170 RequestAction::MessageSend(request) => self.handle_message_send(request),
171 RequestAction::MessageStop(request) => self.handle_message_stop(request),
172 RequestAction::TraceStart(request) => self.handle_trace_start(request),
173 RequestAction::TraceStop => self.handle_trace_stop(),
174 };
175 match result {
176 Ok(data) => Response::ok(id, data),
177 Err(err) => Response::err(id, err),
178 }
179 }
180
181 pub fn tick(&mut self) {
182 self.poll_backend();
183 self.tick_schedules();
184 self.trim_events();
185 }
186
187 pub fn should_shutdown(&self) -> bool {
188 self.shutdown
189 }
190
191 fn handle_connect(&self, connect: ConnectRequest) -> Result<ConnectResult, String> {
192 if connect_identity(&connect) == connect_identity(&self.connect) {
193 Ok(ConnectResult {
194 created: false,
195 already_connected: true,
196 status: self.status(),
197 })
198 } else {
199 Err(
200 "session already exists with different open parameters; disconnect first"
201 .to_string(),
202 )
203 }
204 }
205
206 fn handle_disconnect(&mut self) -> Result<ResponseData, String> {
207 if let Err(err) = self.stop_trace() {
208 self.backend_error = Some(err);
209 }
210 self.schedules.clear();
211 self.shutdown = true;
212 Ok(ResponseData::Disconnected)
213 }
214
215 fn handle_message_list(&self, request: MessageListRequest) -> Result<ResponseData, String> {
216 let filter = request.filter.as_deref().map(Selector::parse).transpose()?;
217 let mut out = Vec::new();
218 for latest in self.latest.values() {
219 let Some(last) = latest.visible(request.include_tx) else {
220 continue;
221 };
222 let matches = self.dbcs.matches_for_frame(&last.frame);
223 if matches.is_empty() {
224 if (self.dbcs.is_empty() || request.allow_raw)
225 && filter
226 .as_ref()
227 .is_none_or(|selector| selector.matches_arb_id(last.frame.arb_id))
228 {
229 out.push(MessageListEntry {
230 label: format!("0x{:X}", last.frame.arb_id),
231 kind: MessageEntryKind::Raw,
232 arb_id: last.frame.arb_id,
233 extended: FrameIdentity::from_frame(&last.frame).extended,
234 fd: (last.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
235 len: last.frame.len,
236 last_seen_unix_ms: last.unix_ms,
237 has_rx: latest.has_rx(),
238 has_tx: latest.has_tx(),
239 });
240 }
241 continue;
242 }
243
244 for message in matches {
245 if filter.as_ref().is_some_and(|selector| match selector {
246 Selector::ArbId(arb_id) => *arb_id != message.arb_id,
247 _ => !selector.matches_qualified_name(&message.qualified_name),
248 }) {
249 continue;
250 }
251 out.push(MessageListEntry {
252 label: message.qualified_name.clone(),
253 kind: MessageEntryKind::Semantic,
254 arb_id: message.arb_id,
255 extended: message.extended,
256 fd: message.len > 8,
257 len: last.frame.len,
258 last_seen_unix_ms: last.unix_ms,
259 has_rx: latest.has_rx(),
260 has_tx: latest.has_tx(),
261 });
262 }
263 }
264 out.sort_by(|lhs, rhs| lhs.label.cmp(&rhs.label));
265 Ok(ResponseData::MessageList { messages: out })
266 }
267
268 fn handle_message_read(&self, request: MessageReadRequest) -> Result<ResponseData, String> {
269 let selector = Selector::parse(&request.select)?;
270 let count = request.count.unwrap_or(1);
271 let observations = match selector {
272 Selector::ArbId(arb_id) => self
273 .events
274 .iter()
275 .rev()
276 .filter(|event| {
277 event.frame.arb_id == arb_id && include_direction(event, request.include_tx)
278 })
279 .take(count)
280 .map(raw_observation)
281 .collect::<Vec<_>>(),
282 Selector::SemanticPattern(_) => {
283 let message = self.dbcs.resolve_selector(&selector)?;
284 self.events
285 .iter()
286 .rev()
287 .filter(|event| {
288 FrameIdentity::from_frame(&event.frame)
289 == FrameIdentity::from_message(message)
290 && include_direction(event, request.include_tx)
291 })
292 .take(count)
293 .map(|event| {
294 semantic_observation(
295 event,
296 self.dbcs
297 .decode_selected(&message.qualified_name, &event.frame),
298 )
299 })
300 .collect::<Result<Vec<_>, String>>()?
301 }
302 };
303 if observations.is_empty() {
304 return Err(format!(
305 "selector '{}' matched no observed traffic",
306 request.select
307 ));
308 }
309 Ok(ResponseData::MessageRead(MessageReadResult {
310 selector: request.select,
311 count: observations.len(),
312 observations,
313 }))
314 }
315
316 fn handle_message_send(&mut self, request: MessageSendRequest) -> Result<ResponseData, String> {
317 let selector = Selector::parse(&request.target)?;
318 let (target, frame) = self.dbcs.encode_payload(&selector, &request.data)?;
319 validate_frame(self.connect.fd, &frame)?;
320 if let Some(periodicity_ms) = request.periodicity_ms {
321 self.schedules.insert(
322 target.clone(),
323 PeriodicScheduleState {
324 target: target.clone(),
325 frame: frame.clone(),
326 periodicity_ms,
327 next_due: next_due_after(Instant::now(), periodicity_ms),
328 },
329 );
330 } else {
331 self.backend.send(&frame)?;
332 self.record_event(EventDirection::Tx, frame.clone())?;
333 self.backend_error = None;
334 }
335 Ok(ResponseData::MessageSent(MessageSendResult {
336 target,
337 arb_id: frame.arb_id,
338 extended: (frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
339 fd: (frame.flags & crate::frame::CAN_FLAG_FD) != 0,
340 len: frame.len,
341 periodicity_ms: request.periodicity_ms,
342 }))
343 }
344
345 fn handle_message_stop(&mut self, request: MessageStopRequest) -> Result<ResponseData, String> {
346 let selector = Selector::parse(&request.target)?;
347 let target = match selector {
348 Selector::ArbId(arb_id) => format!("0x{arb_id:X}"),
349 Selector::SemanticPattern(_) => self
350 .dbcs
351 .resolve_selector(&selector)?
352 .qualified_name
353 .clone(),
354 };
355 let stopped = self.schedules.remove(&target).is_some();
356 Ok(ResponseData::MessageStopped { target, stopped })
357 }
358
359 fn handle_trace_start(&mut self, request: TraceStartRequest) -> Result<ResponseData, String> {
360 if self.trace.is_some() {
361 return Err("trace export already active; stop it before starting another".to_string());
362 }
363 let writer = TraceWriter::start(PathBuf::from(&request.path).as_path())?;
364 let path = writer.path().display().to_string();
365 self.trace = Some(Box::new(writer));
366 Ok(ResponseData::TraceStarted { path })
367 }
368
369 fn handle_trace_stop(&mut self) -> Result<ResponseData, String> {
370 let path = self.stop_trace()?;
371 Ok(ResponseData::TraceStopped { path })
372 }
373
374 fn stop_trace(&mut self) -> Result<Option<String>, String> {
375 if let Some(writer) = self.trace.take() {
376 return writer.finish().map(|path| Some(path.display().to_string()));
377 }
378 Ok(None)
379 }
380
381 fn status(&self) -> SessionStatus {
382 let mut periodic_schedules = self
383 .schedules
384 .values()
385 .map(|schedule| crate::protocol::PeriodicSchedule {
386 target: schedule.target.clone(),
387 arb_id: schedule.frame.arb_id,
388 extended: (schedule.frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
389 fd: (schedule.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
390 len: schedule.frame.len,
391 periodicity_ms: schedule.periodicity_ms,
392 })
393 .collect::<Vec<_>>();
394 periodic_schedules.sort_by(|lhs, rhs| lhs.target.cmp(&rhs.target));
395
396 SessionStatus {
397 connection_state: if self.backend_error.is_some() {
398 "degraded".to_string()
399 } else {
400 "connected".to_string()
401 },
402 adapter: self.connect.adapter.clone(),
403 bitrate: self.connect.bitrate,
404 bitrate_data: self.connect.bitrate_data,
405 fd: self.connect.fd,
406 dbcs: self
407 .dbcs
408 .loaded()
409 .iter()
410 .map(|dbc| LoadedDbc {
411 alias: dbc.alias.clone(),
412 path: dbc.path.clone(),
413 })
414 .collect(),
415 trace_path: self
416 .trace
417 .as_ref()
418 .map(|writer| writer.path().display().to_string()),
419 periodic_schedules,
420 backend_error: self.backend_error.clone(),
421 retention_window_secs: RETENTION_WINDOW.as_secs(),
422 retention_event_cap: RETENTION_EVENT_CAP,
423 }
424 }
425
426 fn poll_backend(&mut self) {
427 let mut error = None;
428 match self.backend.recv_all() {
429 Ok(frames) => {
430 for frame in frames {
431 if let Err(err) = validate_frame(self.connect.fd, &frame)
432 .and_then(|_| self.record_event(EventDirection::Rx, frame))
433 {
434 error.get_or_insert(err);
435 }
436 }
437 }
438 Err(err) => error = Some(err),
439 }
440 self.backend_error = error;
441 }
442
443 fn tick_schedules(&mut self) {
444 let now = Instant::now();
445 let due = self
446 .schedules
447 .values()
448 .filter(|schedule| schedule.next_due <= now)
449 .map(|schedule| schedule.target.clone())
450 .collect::<Vec<_>>();
451 let had_due = !due.is_empty();
452 let mut error = None;
453 for target in due {
454 let Some((frame, periodicity_ms)) = self
455 .schedules
456 .get(&target)
457 .map(|schedule| (schedule.frame.clone(), schedule.periodicity_ms))
458 else {
459 continue;
460 };
461 let next_due = next_due_after(now, periodicity_ms);
462 match self.backend.send(&frame) {
463 Ok(()) => {
464 if let Err(err) = self.record_event(EventDirection::Tx, frame) {
465 error.get_or_insert(err);
466 }
467 }
468 Err(err) => {
469 error.get_or_insert(err);
470 }
471 }
472 if let Some(schedule) = self.schedules.get_mut(&target) {
473 schedule.next_due = next_due;
474 }
475 }
476 if had_due {
477 self.backend_error = error;
478 }
479 }
480
481 fn record_event(
482 &mut self,
483 direction: EventDirection,
484 frame: crate::frame::CanFrame,
485 ) -> Result<(), String> {
486 let event = ObservedEvent {
487 seq: self.next_seq,
488 unix_ms: unix_ms_now()?,
489 recorded_at: Instant::now(),
490 direction: direction.clone(),
491 frame,
492 };
493 self.next_seq += 1;
494 let trace_result = if let Some(trace) = self.trace.as_mut() {
495 trace.write_event(direction.clone(), &event.frame)
496 } else {
497 Ok(())
498 };
499 self.events.push_back(event.clone());
500 self.update_latest(&event);
501 self.trim_events();
502 trace_result
503 }
504
505 fn trim_events(&mut self) {
506 let cutoff = Instant::now()
507 .checked_sub(RETENTION_WINDOW)
508 .unwrap_or_else(Instant::now);
509 while self.events.len() > RETENTION_EVENT_CAP
510 || self
511 .events
512 .front()
513 .is_some_and(|event| event.recorded_at < cutoff)
514 {
515 if let Some(event) = self.events.pop_front() {
516 self.reconcile_trimmed_event(&event);
517 }
518 }
519 }
520
521 fn update_latest(&mut self, event: &ObservedEvent) {
522 let entry = self
523 .latest
524 .entry(FrameIdentity::from_frame(&event.frame))
525 .or_insert_with(|| LatestObservation {
526 latest_any: event.clone(),
527 latest_rx: None,
528 latest_tx: None,
529 });
530 entry.latest_any = event.clone();
531 match event.direction {
532 EventDirection::Rx => entry.latest_rx = Some(event.clone()),
533 EventDirection::Tx => entry.latest_tx = Some(event.clone()),
534 }
535 }
536
537 fn reconcile_trimmed_event(&mut self, event: &ObservedEvent) {
538 let identity = FrameIdentity::from_frame(&event.frame);
539 let Some(latest) = self.latest.get(&identity) else {
540 return;
541 };
542 let touched_latest = latest.latest_any.seq == event.seq
543 || latest
544 .latest_rx
545 .as_ref()
546 .is_some_and(|latest_rx| latest_rx.seq == event.seq)
547 || latest
548 .latest_tx
549 .as_ref()
550 .is_some_and(|latest_tx| latest_tx.seq == event.seq);
551 if touched_latest {
552 self.rebuild_latest_for(identity);
553 }
554 }
555
556 fn rebuild_latest_for(&mut self, identity: FrameIdentity) {
557 let mut latest_any = None;
558 let mut latest_rx = None;
559 let mut latest_tx = None;
560
561 for event in self.events.iter().rev() {
562 if FrameIdentity::from_frame(&event.frame) != identity {
563 continue;
564 }
565 if latest_any.is_none() {
566 latest_any = Some(event.clone());
567 }
568 match event.direction {
569 EventDirection::Rx if latest_rx.is_none() => latest_rx = Some(event.clone()),
570 EventDirection::Tx if latest_tx.is_none() => latest_tx = Some(event.clone()),
571 _ => {}
572 }
573 if latest_any.is_some() && latest_rx.is_some() && latest_tx.is_some() {
574 break;
575 }
576 }
577
578 if let Some(latest_any) = latest_any {
579 self.latest.insert(
580 identity,
581 LatestObservation {
582 latest_any,
583 latest_rx,
584 latest_tx,
585 },
586 );
587 } else {
588 self.latest.remove(&identity);
589 }
590 }
591}
592
593pub async fn run_listener(
594 socket_path: PathBuf,
595 config: DaemonConfig,
596) -> Result<(), std::io::Error> {
597 if let Some(parent) = socket_path.parent() {
598 std::fs::create_dir_all(parent)?;
599 }
600
601 let socket = CanSocket::open(
602 &config.connect.adapter,
603 config.connect.bitrate,
604 config.connect.bitrate_data.unwrap_or(0),
605 config.connect.fd,
606 )
607 .map_err(std::io::Error::other)?;
608 let dbcs = DbcRegistry::load(&config.connect.dbcs).map_err(std::io::Error::other)?;
609 let mut listener = ipc::bind_listener(&socket_path).await?;
610 ipc::create_endpoint_marker(&socket_path)?;
611 std::fs::write(
612 crate::daemon::lifecycle::pid_path(),
613 std::process::id().to_string(),
614 )?;
615
616 let state = SessionService::new(config.connect, dbcs, HardwareBackend { socket });
617
618 let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
619 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
620 let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
621
622 let mut listener_error = None;
623 loop {
624 tokio::select! {
625 changed = shutdown_rx.changed() => {
626 match changed {
627 Ok(()) if *shutdown_rx.borrow() => break,
628 Ok(()) => {}
629 Err(_) => break,
630 }
631 }
632 accepted = listener.accept() => {
633 match accepted {
634 Ok(stream) => {
635 let action_tx = action_tx.clone();
636 tokio::spawn(async move {
637 let _ = handle_connection(stream, action_tx).await;
638 });
639 }
640 Err(err) => {
641 listener_error = Some(err);
642 break;
643 }
644 }
645 }
646 }
647 }
648
649 drop(action_tx);
650 let _ = actor_task.await;
651 ipc::cleanup_endpoint(&socket_path);
652 let pid_path = crate::daemon::lifecycle::pid_path();
653 if pid_path.exists() {
654 let _ = std::fs::remove_file(pid_path);
655 }
656
657 if let Some(err) = listener_error {
658 return Err(err);
659 }
660 Ok(())
661}
662
663async fn handle_connection(
664 stream: BoxedLocalStream,
665 action_tx: mpsc::Sender<ActionMessage>,
666) -> Result<(), std::io::Error> {
667 let (read_half, mut write_half) = split(stream);
668 let mut reader = BufReader::new(read_half);
669 let mut line = Vec::new();
670
671 loop {
672 let read = read_request_line(&mut reader, &mut line).await?;
673 if read == 0 {
674 return Ok(());
675 }
676 let line = std::str::from_utf8(&line)
677 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?;
678 let response = match serde_json::from_str::<Request>(line.trim_end()) {
679 Ok(request) => {
680 let request_id = request.id;
681 let (response_tx, response_rx) = oneshot::channel();
682 if action_tx
683 .send(ActionMessage {
684 request,
685 response_tx,
686 })
687 .await
688 .is_err()
689 {
690 Response::err(request_id, "daemon unavailable")
691 } else {
692 match response_rx.await {
693 Ok(response) => response,
694 Err(_) => Response::err(request_id, "daemon unavailable"),
695 }
696 }
697 }
698 Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
699 };
700 let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
701 format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
702 });
703 payload.push('\n');
704 write_half.write_all(payload.as_bytes()).await?;
705 }
706}
707
708async fn read_request_line<R>(
709 reader: &mut BufReader<R>,
710 line: &mut Vec<u8>,
711) -> Result<usize, std::io::Error>
712where
713 R: tokio::io::AsyncRead + Unpin,
714{
715 line.clear();
716 loop {
717 let available = reader.fill_buf().await?;
718 if available.is_empty() {
719 return Ok(line.len());
720 }
721
722 let take = available
723 .iter()
724 .position(|byte| *byte == b'\n')
725 .map(|index| index + 1)
726 .unwrap_or(available.len());
727 if line.len() + take > MAX_REQUEST_LINE_BYTES {
728 return Err(std::io::Error::new(
729 std::io::ErrorKind::InvalidData,
730 format!("request line exceeds {MAX_REQUEST_LINE_BYTES} bytes"),
731 ));
732 }
733 line.extend_from_slice(&available[..take]);
734 reader.consume(take);
735 if line.last() == Some(&b'\n') {
736 return Ok(line.len());
737 }
738 }
739}
740
741async fn run_actor_task<B: Backend>(
742 mut state: SessionService<B>,
743 mut action_rx: mpsc::Receiver<ActionMessage>,
744 shutdown_tx: watch::Sender<bool>,
745) {
746 let mut poller = tokio::time::interval(POLL_INTERVAL);
747 poller.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
748
749 loop {
750 tokio::select! {
751 maybe_message = action_rx.recv() => match maybe_message {
752 Some(message) => {
753 handle_action_message(&mut state, message);
754 process_action_batch(&mut action_rx, &mut state, MAX_ACTIONS_PER_TURN.saturating_sub(1));
755 }
756 None => break,
757 },
758 _ = poller.tick() => state.tick(),
759 }
760
761 if state.should_shutdown() {
762 let _ = shutdown_tx.send(true);
763 break;
764 }
765 }
766}
767
768fn handle_action_message<B: Backend>(state: &mut SessionService<B>, message: ActionMessage) {
769 let response = state.handle_request(message.request);
770 let _ = message.response_tx.send(response);
771}
772
773fn process_action_batch<B: Backend>(
774 action_rx: &mut mpsc::Receiver<ActionMessage>,
775 state: &mut SessionService<B>,
776 limit: usize,
777) {
778 for _ in 0..limit {
779 let Some(message) = action_rx.try_recv().ok() else {
780 break;
781 };
782 handle_action_message(state, message);
783 }
784}
785
786fn next_due_after(now: Instant, periodicity_ms: u64) -> Instant {
787 now + Duration::from_millis(periodicity_ms.max(1))
788}
789
790fn connect_identity(connect: &ConnectRequest) -> ConnectRequest {
791 let mut normalized = connect.clone();
792 normalized.dbcs.sort();
793 normalized
794}
795
796fn include_direction(event: &ObservedEvent, include_tx: bool) -> bool {
797 include_tx || matches!(event.direction, EventDirection::Rx)
798}
799
800fn raw_observation(event: &ObservedEvent) -> MessageObservation {
801 MessageObservation::Raw {
802 seq: event.seq,
803 direction: event.direction.clone(),
804 unix_ms: event.unix_ms,
805 arb_id: event.frame.arb_id,
806 extended: (event.frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
807 fd: (event.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
808 len: event.frame.len,
809 payload_hex: payload_to_hex(event.frame.payload()),
810 }
811}
812
813fn semantic_observation(
814 event: &ObservedEvent,
815 decoded: Result<crate::can::dbc::DecodedSemanticMessage, String>,
816) -> Result<MessageObservation, String> {
817 let decoded = decoded?;
818 Ok(MessageObservation::Semantic {
819 seq: event.seq,
820 direction: event.direction.clone(),
821 unix_ms: event.unix_ms,
822 qualified_name: decoded.qualified_name,
823 arb_id: decoded.arb_id,
824 extended: decoded.extended,
825 fd: decoded.fd,
826 len: decoded.len,
827 payload_hex: payload_to_hex(event.frame.payload()),
828 signals: decoded.signals,
829 })
830}
831
832fn unix_ms_now() -> Result<u128, String> {
833 Ok(SystemTime::now()
834 .duration_since(SystemTime::UNIX_EPOCH)
835 .map_err(|err| format!("system clock error: {err}"))?
836 .as_millis())
837}
838
839#[cfg(test)]
840mod tests {
841 use super::{
842 ActiveTrace, Backend, EventDirection, MAX_REQUEST_LINE_BYTES, MessageListRequest,
843 RETENTION_WINDOW, Request, RequestAction, ResponseData, SessionService, TraceStartRequest,
844 read_request_line,
845 };
846 use crate::can::dbc::DbcRegistry;
847 use crate::frame::CanFrame;
848 use crate::protocol::{
849 ConnectRequest, DbcSpec, MessagePayload, MessageReadRequest, MessageSendRequest,
850 MessageStopRequest,
851 };
852 use std::collections::{BTreeMap, VecDeque};
853 use std::io::Write;
854 use std::path::{Path, PathBuf};
855 use std::time::{Duration, Instant};
856 use tokio::io::{AsyncWriteExt, BufReader, duplex};
857 use uuid::Uuid;
858
859 #[derive(Debug, Default)]
860 struct MockBackend {
861 recv_results: VecDeque<Result<Vec<CanFrame>, String>>,
862 send_results: VecDeque<Result<(), String>>,
863 sent: Vec<CanFrame>,
864 }
865
866 impl Backend for MockBackend {
867 fn recv_all(&mut self) -> Result<Vec<CanFrame>, String> {
868 self.recv_results
869 .pop_front()
870 .unwrap_or_else(|| Ok(Vec::new()))
871 }
872
873 fn send(&mut self, frame: &CanFrame) -> Result<(), String> {
874 if let Some(result) = self.send_results.pop_front() {
875 return result.map(|()| self.sent.push(frame.clone()));
876 }
877 self.sent.push(frame.clone());
878 Ok(())
879 }
880 }
881
882 struct FailingTrace {
883 path: PathBuf,
884 finish_error: Option<String>,
885 write_error: Option<String>,
886 }
887
888 impl FailingTrace {
889 fn on_finish(message: &str) -> Self {
890 Self {
891 path: PathBuf::from("/tmp/failing.asc"),
892 finish_error: Some(message.to_string()),
893 write_error: None,
894 }
895 }
896
897 fn on_write(message: &str) -> Self {
898 Self {
899 path: PathBuf::from("/tmp/failing.asc"),
900 finish_error: None,
901 write_error: Some(message.to_string()),
902 }
903 }
904 }
905
906 impl ActiveTrace for FailingTrace {
907 fn path(&self) -> &Path {
908 &self.path
909 }
910
911 fn write_event(
912 &mut self,
913 _direction: EventDirection,
914 _frame: &CanFrame,
915 ) -> Result<(), String> {
916 if let Some(err) = &self.write_error {
917 return Err(err.clone());
918 }
919 Ok(())
920 }
921
922 fn finish(self: Box<Self>) -> Result<PathBuf, String> {
923 if let Some(err) = self.finish_error {
924 return Err(err);
925 }
926 Ok(self.path)
927 }
928 }
929
930 fn write_temp_dbc(contents: &str) -> tempfile::NamedTempFile {
931 let mut file = tempfile::NamedTempFile::new().expect("tempfile");
932 write!(file, "{contents}").expect("write dbc");
933 file
934 }
935
936 fn sample_service(overlap: bool) -> SessionService<MockBackend> {
937 let dbc_a = write_temp_dbc(
938 "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n",
939 );
940 let dbc_b = write_temp_dbc(
941 "VERSION \"\"\nBO_ 291 Bar: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|255] \"\" ECU\n",
942 );
943 let dbcs = if overlap {
944 vec![
945 DbcSpec {
946 alias: "a".to_string(),
947 path: dbc_a.path().display().to_string(),
948 },
949 DbcSpec {
950 alias: "b".to_string(),
951 path: dbc_b.path().display().to_string(),
952 },
953 ]
954 } else {
955 vec![DbcSpec {
956 alias: "a".to_string(),
957 path: dbc_a.path().display().to_string(),
958 }]
959 };
960 let registry = DbcRegistry::load(&dbcs).expect("registry");
961 SessionService::new(
962 ConnectRequest {
963 adapter: "usb1".to_string(),
964 bitrate: 500_000,
965 bitrate_data: None,
966 fd: false,
967 dbcs,
968 },
969 registry,
970 MockBackend::default(),
971 )
972 }
973
974 fn empty_service() -> SessionService<MockBackend> {
975 SessionService::new(
976 ConnectRequest {
977 adapter: "usb1".to_string(),
978 bitrate: 500_000,
979 bitrate_data: None,
980 fd: false,
981 dbcs: Vec::new(),
982 },
983 DbcRegistry::empty(),
984 MockBackend::default(),
985 )
986 }
987
988 fn raw_frame(arb_id: u32, first_byte: u8) -> CanFrame {
989 raw_frame_with_len(arb_id, first_byte, 8)
990 }
991
992 fn raw_frame_with_len(arb_id: u32, first_byte: u8, len: u8) -> CanFrame {
993 let mut data = [0_u8; 64];
994 data[0] = first_byte;
995 CanFrame {
996 arb_id,
997 len,
998 flags: 0,
999 data,
1000 }
1001 }
1002
1003 #[test]
1004 fn connect_same_session_attaches_and_different_session_fails() {
1005 let service = sample_service(false);
1006 let same = service
1007 .handle_connect(service.connect.clone())
1008 .expect("same config must attach");
1009 assert!(same.already_connected);
1010
1011 let err = service
1012 .handle_connect(ConnectRequest {
1013 adapter: "usb2".to_string(),
1014 ..service.connect.clone()
1015 })
1016 .expect_err("different config must fail");
1017 assert!(err.contains("disconnect first"));
1018 }
1019
1020 #[test]
1021 fn connect_succeeds_with_zero_dbcs_loaded() {
1022 let service = empty_service();
1023 let same = service
1024 .handle_connect(service.connect.clone())
1025 .expect("same config must attach");
1026 assert!(same.already_connected);
1027 assert!(same.status.dbcs.is_empty());
1028 }
1029
1030 #[test]
1031 fn schema_reports_semantic_inventory_while_message_list_stays_empty_without_traffic() {
1032 let mut service = sample_service(false);
1033
1034 let ResponseData::Schema { messages } = service
1035 .handle_request(Request {
1036 id: Uuid::new_v4(),
1037 action: RequestAction::Schema(crate::protocol::SchemaRequest { filter: None }),
1038 })
1039 .data
1040 .expect("schema data")
1041 else {
1042 panic!("wrong schema response");
1043 };
1044 assert_eq!(messages.len(), 1);
1045 assert_eq!(messages[0].qualified_name, "a.Foo");
1046
1047 let ResponseData::MessageList { messages } = service
1048 .handle_message_list(MessageListRequest {
1049 filter: None,
1050 allow_raw: false,
1051 include_tx: false,
1052 })
1053 .expect("message list")
1054 else {
1055 panic!("wrong message list response");
1056 };
1057 assert!(messages.is_empty());
1058 }
1059
1060 #[test]
1061 fn message_list_splits_overlap_entries() {
1062 let mut service = sample_service(true);
1063 service
1064 .record_event(EventDirection::Rx, raw_frame(291, 1))
1065 .expect("record");
1066 let data = service
1067 .handle_message_list(MessageListRequest {
1068 filter: None,
1069 allow_raw: false,
1070 include_tx: false,
1071 })
1072 .expect("message list");
1073 let ResponseData::MessageList { messages } = data else {
1074 panic!("wrong response");
1075 };
1076 assert_eq!(messages.len(), 2);
1077 assert_eq!(messages[0].label, "a.Foo");
1078 assert_eq!(messages[1].label, "b.Bar");
1079 }
1080
1081 #[test]
1082 fn message_list_excluding_tx_uses_latest_rx_observation() {
1083 let mut service = sample_service(false);
1084 service
1085 .record_event(EventDirection::Rx, raw_frame_with_len(0x123, 1, 8))
1086 .expect("record rx");
1087 service
1088 .record_event(EventDirection::Tx, raw_frame_with_len(0x123, 2, 1))
1089 .expect("record tx");
1090
1091 let ResponseData::MessageList { messages } = service
1092 .handle_message_list(MessageListRequest {
1093 filter: None,
1094 allow_raw: false,
1095 include_tx: false,
1096 })
1097 .expect("message list")
1098 else {
1099 panic!("wrong response");
1100 };
1101
1102 assert_eq!(messages.len(), 1);
1103 assert_eq!(messages[0].len, 8);
1104 assert!(messages[0].has_rx);
1105 assert!(messages[0].has_tx);
1106 }
1107
1108 #[test]
1109 fn message_read_decodes_selected_alias() {
1110 let mut service = sample_service(true);
1111 service
1112 .record_event(EventDirection::Rx, raw_frame(291, 7))
1113 .expect("record");
1114 let data = service
1115 .handle_message_read(MessageReadRequest {
1116 select: "b.Bar".to_string(),
1117 count: None,
1118 include_tx: false,
1119 })
1120 .expect("message read");
1121 let ResponseData::MessageRead(result) = data else {
1122 panic!("wrong response");
1123 };
1124 let crate::protocol::MessageObservation::Semantic { qualified_name, .. } =
1125 &result.observations[0]
1126 else {
1127 panic!("expected semantic observation");
1128 };
1129 assert_eq!(qualified_name, "b.Bar");
1130 }
1131
1132 #[test]
1133 fn semantic_send_requires_full_signal_map_and_periodic_stop_works() {
1134 let dbc = write_temp_dbc(
1135 "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n SG_ torque : 8|8@1+ (1,0) [0|255] \"\" ECU\n",
1136 );
1137 let dbcs = vec![DbcSpec {
1138 alias: "a".to_string(),
1139 path: dbc.path().display().to_string(),
1140 }];
1141 let registry = DbcRegistry::load(&dbcs).expect("registry");
1142 let mut service = SessionService::new(
1143 ConnectRequest {
1144 adapter: "usb1".to_string(),
1145 bitrate: 500_000,
1146 bitrate_data: None,
1147 fd: false,
1148 dbcs,
1149 },
1150 registry,
1151 MockBackend::default(),
1152 );
1153
1154 let mut incomplete = BTreeMap::new();
1155 incomplete.insert("enable".to_string(), 1.0);
1156 let err = service
1157 .handle_message_send(MessageSendRequest {
1158 target: "a.Foo".to_string(),
1159 data: MessagePayload::Signals(incomplete),
1160 periodicity_ms: None,
1161 })
1162 .expect_err("missing signal");
1163 assert!(err.contains("missing required signal 'torque'"));
1164
1165 let data = service
1166 .handle_message_send(MessageSendRequest {
1167 target: "0x123".to_string(),
1168 data: MessagePayload::RawHex("DEADBEEF".to_string()),
1169 periodicity_ms: Some(100),
1170 })
1171 .expect("schedule");
1172 let ResponseData::MessageSent(sent) = data else {
1173 panic!("wrong response");
1174 };
1175 assert_eq!(sent.target, "0x123");
1176
1177 let stopped = service
1178 .handle_message_stop(MessageStopRequest {
1179 target: "0x123".to_string(),
1180 })
1181 .expect("stop");
1182 let ResponseData::MessageStopped { stopped, .. } = stopped else {
1183 panic!("wrong response");
1184 };
1185 assert!(stopped);
1186 }
1187
1188 #[test]
1189 fn periodic_send_overwrites_existing_schedule_for_same_target() {
1190 let mut service = sample_service(false);
1191 service
1192 .handle_message_send(MessageSendRequest {
1193 target: "0x123".to_string(),
1194 data: MessagePayload::RawHex("AAAA".to_string()),
1195 periodicity_ms: Some(100),
1196 })
1197 .expect("initial schedule");
1198
1199 service
1200 .handle_message_send(MessageSendRequest {
1201 target: "0x123".to_string(),
1202 data: MessagePayload::RawHex("BBBB".to_string()),
1203 periodicity_ms: Some(250),
1204 })
1205 .expect("overwrite schedule");
1206
1207 assert_eq!(service.schedules.len(), 1);
1208 let schedule = service.schedules.get("0x123").expect("schedule");
1209 assert_eq!(schedule.periodicity_ms, 250);
1210 assert_eq!(
1211 crate::protocol::payload_to_hex(schedule.frame.payload()),
1212 "BBBB"
1213 );
1214 }
1215
1216 #[test]
1217 fn semantic_periodic_send_stores_raw_encoded_schedule_state() {
1218 let dbc = write_temp_dbc(
1219 "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n SG_ torque : 8|8@1+ (1,0) [0|255] \"\" ECU\n",
1220 );
1221 let dbcs = vec![DbcSpec {
1222 alias: "a".to_string(),
1223 path: dbc.path().display().to_string(),
1224 }];
1225 let registry = DbcRegistry::load(&dbcs).expect("registry");
1226 let mut service = SessionService::new(
1227 ConnectRequest {
1228 adapter: "usb1".to_string(),
1229 bitrate: 500_000,
1230 bitrate_data: None,
1231 fd: false,
1232 dbcs,
1233 },
1234 registry,
1235 MockBackend::default(),
1236 );
1237
1238 let mut data = BTreeMap::new();
1239 data.insert("enable".to_string(), 1.0);
1240 data.insert("torque".to_string(), 12.0);
1241 service
1242 .handle_message_send(MessageSendRequest {
1243 target: "a.Foo".to_string(),
1244 data: MessagePayload::Signals(data),
1245 periodicity_ms: Some(100),
1246 })
1247 .expect("semantic schedule");
1248
1249 let schedule = service.schedules.get("a.Foo").expect("schedule");
1250 assert_eq!(schedule.frame.arb_id, 291);
1251 assert_eq!(schedule.frame.len, 8);
1252 assert_eq!(
1253 crate::protocol::payload_to_hex(schedule.frame.payload()),
1254 "010C000000000000"
1255 );
1256 }
1257
1258 #[test]
1259 fn message_read_returns_error_when_no_matching_traffic_exists() {
1260 let service = sample_service(false);
1261 let err = service
1262 .handle_message_read(MessageReadRequest {
1263 select: "a.Foo".to_string(),
1264 count: None,
1265 include_tx: false,
1266 })
1267 .expect_err("empty traffic must fail");
1268 assert!(err.contains("matched no observed traffic"));
1269 }
1270
1271 #[test]
1272 fn disconnect_still_shuts_down_when_trace_finish_fails() {
1273 let mut service = sample_service(false);
1274 service.trace = Some(Box::new(FailingTrace::on_finish("flush failed")));
1275 service.schedules.insert(
1276 "0x123".to_string(),
1277 super::PeriodicScheduleState {
1278 target: "0x123".to_string(),
1279 frame: raw_frame(0x123, 1),
1280 periodicity_ms: 100,
1281 next_due: Instant::now() + Duration::from_millis(100),
1282 },
1283 );
1284
1285 let response = service.handle_disconnect().expect("disconnect");
1286 assert!(matches!(response, ResponseData::Disconnected));
1287 assert!(service.should_shutdown());
1288 assert!(service.schedules.is_empty());
1289 assert_eq!(service.backend_error.as_deref(), Some("flush failed"));
1290 assert!(service.trace.is_none());
1291 }
1292
1293 #[test]
1294 fn periodic_send_failure_reschedules_and_clears_degraded_after_recovery() {
1295 let mut service = sample_service(false);
1296 let data = service
1297 .handle_message_send(MessageSendRequest {
1298 target: "0x123".to_string(),
1299 data: MessagePayload::RawHex("DEADBEEF".to_string()),
1300 periodicity_ms: Some(50),
1301 })
1302 .expect("schedule");
1303 assert!(matches!(data, ResponseData::MessageSent(_)));
1304
1305 let schedule = service.schedules.get_mut("0x123").expect("schedule state");
1306 schedule.next_due = Instant::now() - Duration::from_millis(1);
1307 service
1308 .backend
1309 .send_results
1310 .push_back(Err("tx failed".to_string()));
1311
1312 service.tick_schedules();
1313 assert_eq!(service.backend.sent.len(), 0);
1314 assert_eq!(service.backend_error.as_deref(), Some("tx failed"));
1315 let failed_next_due = service.schedules["0x123"].next_due;
1316 assert!(failed_next_due > Instant::now() - Duration::from_millis(1));
1317
1318 service.tick_schedules();
1319 assert_eq!(service.backend.sent.len(), 0);
1320 assert_eq!(service.backend_error.as_deref(), Some("tx failed"));
1321
1322 service
1323 .schedules
1324 .get_mut("0x123")
1325 .expect("schedule")
1326 .next_due = Instant::now() - Duration::from_millis(1);
1327 service.backend.send_results.push_back(Ok(()));
1328 service.tick_schedules();
1329
1330 assert_eq!(service.backend.sent.len(), 1);
1331 assert_eq!(service.backend_error, None);
1332 }
1333
1334 #[test]
1335 fn periodic_send_trace_errors_surface_without_dropping_event_state() {
1336 let mut service = sample_service(false);
1337 service.trace = Some(Box::new(FailingTrace::on_write("trace write failed")));
1338 service.schedules.insert(
1339 "0x123".to_string(),
1340 super::PeriodicScheduleState {
1341 target: "0x123".to_string(),
1342 frame: raw_frame(0x123, 1),
1343 periodicity_ms: 25,
1344 next_due: Instant::now() - Duration::from_millis(1),
1345 },
1346 );
1347
1348 service.tick_schedules();
1349
1350 assert_eq!(service.backend.sent.len(), 1);
1351 assert_eq!(service.backend_error.as_deref(), Some("trace write failed"));
1352 assert_eq!(service.events.len(), 1);
1353 assert!(
1354 service
1355 .latest
1356 .contains_key(&crate::can::dbc::FrameIdentity {
1357 arb_id: 0x123,
1358 extended: false,
1359 })
1360 );
1361 }
1362
1363 #[test]
1364 fn tick_trims_expired_events_and_rebuilds_latest_index() {
1365 let mut service = sample_service(false);
1366 service
1367 .record_event(EventDirection::Rx, raw_frame(0x123, 1))
1368 .expect("record");
1369 service.events[0].recorded_at = Instant::now() - RETENTION_WINDOW - Duration::from_secs(1);
1370 service.tick();
1371 assert!(service.events.is_empty());
1372 assert!(service.latest.is_empty());
1373 }
1374
1375 #[test]
1376 fn degraded_state_clears_after_successful_backend_poll() {
1377 let mut service = sample_service(false);
1378 service
1379 .backend
1380 .recv_results
1381 .push_back(Err("rx failed".to_string()));
1382 service.backend.recv_results.push_back(Ok(Vec::new()));
1383 service.tick();
1384 assert_eq!(service.status().connection_state, "degraded");
1385 service.tick();
1386 assert_eq!(service.status().connection_state, "connected");
1387 }
1388
1389 #[test]
1390 fn trace_start_stop_and_degraded_backend_preserve_status() {
1391 let mut service = sample_service(false);
1392 let tempdir = tempfile::tempdir().expect("trace dir");
1393 let path = tempdir.path().join("trace.asc").display().to_string();
1394 let started = service
1395 .handle_trace_start(TraceStartRequest { path: path.clone() })
1396 .expect("trace start");
1397 assert!(matches!(started, ResponseData::TraceStarted { .. }));
1398 let stopped = service.handle_trace_stop().expect("trace stop");
1399 assert!(matches!(stopped, ResponseData::TraceStopped { .. }));
1400 }
1401
1402 #[test]
1403 fn request_round_trip_through_service_serializes_current_verbs() {
1404 let mut service = sample_service(false);
1405 let response = service.handle_request(Request {
1406 id: Uuid::new_v4(),
1407 action: RequestAction::Status,
1408 });
1409 assert!(response.success);
1410 }
1411
1412 #[tokio::test]
1413 async fn read_request_line_rejects_oversized_requests() {
1414 let (mut writer, reader) = duplex(1024);
1415 let writer_task = tokio::spawn(async move {
1416 let oversized = vec![b'a'; MAX_REQUEST_LINE_BYTES + 1];
1417 let _ = writer.write_all(&oversized).await;
1418 });
1419
1420 let mut reader = BufReader::new(reader);
1421 let mut line = Vec::new();
1422 let err = read_request_line(&mut reader, &mut line)
1423 .await
1424 .expect_err("oversized line must fail");
1425 assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1426 drop(reader);
1427 writer_task.await.expect("writer task");
1428 }
1429
1430 #[test]
1431 fn next_due_after_never_reuses_due_now_deadline() {
1432 let now = Instant::now();
1433 assert!(super::next_due_after(now, 0) >= now + Duration::from_millis(1));
1434 assert!(super::next_due_after(now, 25) >= now + Duration::from_millis(25));
1435 }
1436}