Skip to main content

agent_can/daemon/
server.rs

1use crate::can::dbc::{DbcRegistry, FrameIdentity};
2use crate::can::{CanSocket, validate_frame};
3use crate::daemon::config::DaemonConfig;
4use crate::ipc::{self, BoxedLocalStream};
5use crate::protocol::{
6    ConnectRequest, ConnectResult, EventDirection, LoadedDbc, MessageEntryKind, MessageListEntry,
7    MessageListRequest, MessageObservation, MessageReadRequest, MessageReadResult,
8    MessageSendRequest, MessageSendResult, MessageStopRequest, Request, RequestAction, Response,
9    ResponseData, Selector, SessionStatus, TraceStartRequest, payload_to_hex,
10};
11use crate::trace::TraceWriter;
12use std::collections::{BTreeMap, HashMap, VecDeque};
13use std::path::Path;
14use std::path::PathBuf;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, split};
17use tokio::sync::{mpsc, oneshot, watch};
18
19const MAX_ACTIONS_PER_TURN: usize = 16;
20const POLL_INTERVAL: Duration = Duration::from_millis(10);
21const RETENTION_WINDOW: Duration = Duration::from_secs(60);
22const RETENTION_EVENT_CAP: usize = 4096;
23const MAX_REQUEST_LINE_BYTES: usize = 64 * 1024;
24
25pub trait Backend: Send {
26    fn recv_all(&mut self) -> Result<Vec<crate::frame::CanFrame>, String>;
27    fn send(&mut self, frame: &crate::frame::CanFrame) -> Result<(), String>;
28}
29
30#[derive(Debug)]
31struct HardwareBackend {
32    socket: CanSocket,
33}
34
35impl Backend for HardwareBackend {
36    fn recv_all(&mut self) -> Result<Vec<crate::frame::CanFrame>, String> {
37        self.socket.recv_all()
38    }
39
40    fn send(&mut self, frame: &crate::frame::CanFrame) -> Result<(), String> {
41        self.socket.send(frame)
42    }
43}
44
45#[derive(Debug, Clone)]
46struct ObservedEvent {
47    seq: u64,
48    unix_ms: u128,
49    recorded_at: Instant,
50    direction: EventDirection,
51    frame: crate::frame::CanFrame,
52}
53
54#[derive(Debug, Clone)]
55struct LatestObservation {
56    latest_any: ObservedEvent,
57    latest_rx: Option<ObservedEvent>,
58    latest_tx: Option<ObservedEvent>,
59}
60
61impl LatestObservation {
62    fn visible(&self, include_tx: bool) -> Option<&ObservedEvent> {
63        if include_tx {
64            Some(&self.latest_any)
65        } else {
66            self.latest_rx.as_ref()
67        }
68    }
69
70    fn has_rx(&self) -> bool {
71        self.latest_rx.is_some()
72    }
73
74    fn has_tx(&self) -> bool {
75        self.latest_tx.is_some()
76    }
77}
78
79#[derive(Debug, Clone)]
80struct PeriodicScheduleState {
81    target: String,
82    frame: crate::frame::CanFrame,
83    periodicity_ms: u64,
84    next_due: Instant,
85}
86
87pub struct SessionService<B: Backend> {
88    connect: ConnectRequest,
89    dbcs: DbcRegistry,
90    backend: B,
91    events: VecDeque<ObservedEvent>,
92    latest: HashMap<FrameIdentity, LatestObservation>,
93    schedules: BTreeMap<String, PeriodicScheduleState>,
94    trace: Option<Box<dyn ActiveTrace>>,
95    backend_error: Option<String>,
96    next_seq: u64,
97    shutdown: bool,
98}
99
100struct ActionMessage {
101    request: Request,
102    response_tx: oneshot::Sender<Response>,
103}
104
105trait ActiveTrace: Send {
106    fn path(&self) -> &Path;
107    fn write_event(
108        &mut self,
109        direction: EventDirection,
110        frame: &crate::frame::CanFrame,
111    ) -> Result<(), String>;
112    fn finish(self: Box<Self>) -> Result<PathBuf, String>;
113}
114
115impl ActiveTrace for TraceWriter {
116    fn path(&self) -> &Path {
117        self.path()
118    }
119
120    fn write_event(
121        &mut self,
122        direction: EventDirection,
123        frame: &crate::frame::CanFrame,
124    ) -> Result<(), String> {
125        self.write_event(direction, frame)
126    }
127
128    fn finish(self: Box<Self>) -> Result<PathBuf, String> {
129        TraceWriter::finish(*self)
130    }
131}
132
133impl<B: Backend> SessionService<B> {
134    pub fn new(connect: ConnectRequest, dbcs: DbcRegistry, backend: B) -> Self {
135        Self {
136            connect,
137            dbcs,
138            backend,
139            events: VecDeque::new(),
140            latest: HashMap::new(),
141            schedules: BTreeMap::new(),
142            trace: None,
143            backend_error: None,
144            next_seq: 1,
145            shutdown: false,
146        }
147    }
148
149    pub fn handle_request(&mut self, request: Request) -> Response {
150        let id = request.id;
151        let result: Result<ResponseData, String> = match request.action {
152            RequestAction::AdaptersList => Ok(ResponseData::AdaptersList {
153                adapters: crate::can::available_adapters(),
154            }),
155            RequestAction::Connect(connect) => {
156                self.handle_connect(connect).map(ResponseData::Connected)
157            }
158            RequestAction::Disconnect => self.handle_disconnect(),
159            RequestAction::Status => Ok(ResponseData::Status(self.status())),
160            RequestAction::Schema(schema) => schema
161                .filter
162                .as_deref()
163                .map(Selector::parse)
164                .transpose()
165                .map(|filter| ResponseData::Schema {
166                    messages: self.dbcs.schema(filter.as_ref()),
167                }),
168            RequestAction::MessageList(request) => self.handle_message_list(request),
169            RequestAction::MessageRead(request) => self.handle_message_read(request),
170            RequestAction::MessageSend(request) => self.handle_message_send(request),
171            RequestAction::MessageStop(request) => self.handle_message_stop(request),
172            RequestAction::TraceStart(request) => self.handle_trace_start(request),
173            RequestAction::TraceStop => self.handle_trace_stop(),
174        };
175        match result {
176            Ok(data) => Response::ok(id, data),
177            Err(err) => Response::err(id, err),
178        }
179    }
180
181    pub fn tick(&mut self) {
182        self.poll_backend();
183        self.tick_schedules();
184        self.trim_events();
185    }
186
187    pub fn should_shutdown(&self) -> bool {
188        self.shutdown
189    }
190
191    fn handle_connect(&self, connect: ConnectRequest) -> Result<ConnectResult, String> {
192        if connect_identity(&connect) == connect_identity(&self.connect) {
193            Ok(ConnectResult {
194                created: false,
195                already_connected: true,
196                status: self.status(),
197            })
198        } else {
199            Err(
200                "session already exists with different open parameters; disconnect first"
201                    .to_string(),
202            )
203        }
204    }
205
206    fn handle_disconnect(&mut self) -> Result<ResponseData, String> {
207        if let Err(err) = self.stop_trace() {
208            self.backend_error = Some(err);
209        }
210        self.schedules.clear();
211        self.shutdown = true;
212        Ok(ResponseData::Disconnected)
213    }
214
215    fn handle_message_list(&self, request: MessageListRequest) -> Result<ResponseData, String> {
216        let filter = request.filter.as_deref().map(Selector::parse).transpose()?;
217        let mut out = Vec::new();
218        for latest in self.latest.values() {
219            let Some(last) = latest.visible(request.include_tx) else {
220                continue;
221            };
222            let matches = self.dbcs.matches_for_frame(&last.frame);
223            if matches.is_empty() {
224                if (self.dbcs.is_empty() || request.allow_raw)
225                    && filter
226                        .as_ref()
227                        .is_none_or(|selector| selector.matches_arb_id(last.frame.arb_id))
228                {
229                    out.push(MessageListEntry {
230                        label: format!("0x{:X}", last.frame.arb_id),
231                        kind: MessageEntryKind::Raw,
232                        arb_id: last.frame.arb_id,
233                        extended: FrameIdentity::from_frame(&last.frame).extended,
234                        fd: (last.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
235                        len: last.frame.len,
236                        last_seen_unix_ms: last.unix_ms,
237                        has_rx: latest.has_rx(),
238                        has_tx: latest.has_tx(),
239                    });
240                }
241                continue;
242            }
243
244            for message in matches {
245                if filter.as_ref().is_some_and(|selector| match selector {
246                    Selector::ArbId(arb_id) => *arb_id != message.arb_id,
247                    _ => !selector.matches_qualified_name(&message.qualified_name),
248                }) {
249                    continue;
250                }
251                out.push(MessageListEntry {
252                    label: message.qualified_name.clone(),
253                    kind: MessageEntryKind::Semantic,
254                    arb_id: message.arb_id,
255                    extended: message.extended,
256                    fd: message.len > 8,
257                    len: last.frame.len,
258                    last_seen_unix_ms: last.unix_ms,
259                    has_rx: latest.has_rx(),
260                    has_tx: latest.has_tx(),
261                });
262            }
263        }
264        out.sort_by(|lhs, rhs| lhs.label.cmp(&rhs.label));
265        Ok(ResponseData::MessageList { messages: out })
266    }
267
268    fn handle_message_read(&self, request: MessageReadRequest) -> Result<ResponseData, String> {
269        let selector = Selector::parse(&request.select)?;
270        let count = request.count.unwrap_or(1);
271        let observations = match selector {
272            Selector::ArbId(arb_id) => self
273                .events
274                .iter()
275                .rev()
276                .filter(|event| {
277                    event.frame.arb_id == arb_id && include_direction(event, request.include_tx)
278                })
279                .take(count)
280                .map(raw_observation)
281                .collect::<Vec<_>>(),
282            Selector::SemanticPattern(_) => {
283                let message = self.dbcs.resolve_selector(&selector)?;
284                self.events
285                    .iter()
286                    .rev()
287                    .filter(|event| {
288                        FrameIdentity::from_frame(&event.frame)
289                            == FrameIdentity::from_message(message)
290                            && include_direction(event, request.include_tx)
291                    })
292                    .take(count)
293                    .map(|event| {
294                        semantic_observation(
295                            event,
296                            self.dbcs
297                                .decode_selected(&message.qualified_name, &event.frame),
298                        )
299                    })
300                    .collect::<Result<Vec<_>, String>>()?
301            }
302        };
303        if observations.is_empty() {
304            return Err(format!(
305                "selector '{}' matched no observed traffic",
306                request.select
307            ));
308        }
309        Ok(ResponseData::MessageRead(MessageReadResult {
310            selector: request.select,
311            count: observations.len(),
312            observations,
313        }))
314    }
315
316    fn handle_message_send(&mut self, request: MessageSendRequest) -> Result<ResponseData, String> {
317        let selector = Selector::parse(&request.target)?;
318        let (target, frame) = self.dbcs.encode_payload(&selector, &request.data)?;
319        validate_frame(self.connect.fd, &frame)?;
320        if let Some(periodicity_ms) = request.periodicity_ms {
321            self.schedules.insert(
322                target.clone(),
323                PeriodicScheduleState {
324                    target: target.clone(),
325                    frame: frame.clone(),
326                    periodicity_ms,
327                    next_due: next_due_after(Instant::now(), periodicity_ms),
328                },
329            );
330        } else {
331            self.backend.send(&frame)?;
332            self.record_event(EventDirection::Tx, frame.clone())?;
333            self.backend_error = None;
334        }
335        Ok(ResponseData::MessageSent(MessageSendResult {
336            target,
337            arb_id: frame.arb_id,
338            extended: (frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
339            fd: (frame.flags & crate::frame::CAN_FLAG_FD) != 0,
340            len: frame.len,
341            periodicity_ms: request.periodicity_ms,
342        }))
343    }
344
345    fn handle_message_stop(&mut self, request: MessageStopRequest) -> Result<ResponseData, String> {
346        let selector = Selector::parse(&request.target)?;
347        let target = match selector {
348            Selector::ArbId(arb_id) => format!("0x{arb_id:X}"),
349            Selector::SemanticPattern(_) => self
350                .dbcs
351                .resolve_selector(&selector)?
352                .qualified_name
353                .clone(),
354        };
355        let stopped = self.schedules.remove(&target).is_some();
356        Ok(ResponseData::MessageStopped { target, stopped })
357    }
358
359    fn handle_trace_start(&mut self, request: TraceStartRequest) -> Result<ResponseData, String> {
360        if self.trace.is_some() {
361            return Err("trace export already active; stop it before starting another".to_string());
362        }
363        let writer = TraceWriter::start(PathBuf::from(&request.path).as_path())?;
364        let path = writer.path().display().to_string();
365        self.trace = Some(Box::new(writer));
366        Ok(ResponseData::TraceStarted { path })
367    }
368
369    fn handle_trace_stop(&mut self) -> Result<ResponseData, String> {
370        let path = self.stop_trace()?;
371        Ok(ResponseData::TraceStopped { path })
372    }
373
374    fn stop_trace(&mut self) -> Result<Option<String>, String> {
375        if let Some(writer) = self.trace.take() {
376            return writer.finish().map(|path| Some(path.display().to_string()));
377        }
378        Ok(None)
379    }
380
381    fn status(&self) -> SessionStatus {
382        let mut periodic_schedules = self
383            .schedules
384            .values()
385            .map(|schedule| crate::protocol::PeriodicSchedule {
386                target: schedule.target.clone(),
387                arb_id: schedule.frame.arb_id,
388                extended: (schedule.frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
389                fd: (schedule.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
390                len: schedule.frame.len,
391                periodicity_ms: schedule.periodicity_ms,
392            })
393            .collect::<Vec<_>>();
394        periodic_schedules.sort_by(|lhs, rhs| lhs.target.cmp(&rhs.target));
395
396        SessionStatus {
397            connection_state: if self.backend_error.is_some() {
398                "degraded".to_string()
399            } else {
400                "connected".to_string()
401            },
402            adapter: self.connect.adapter.clone(),
403            bitrate: self.connect.bitrate,
404            bitrate_data: self.connect.bitrate_data,
405            fd: self.connect.fd,
406            dbcs: self
407                .dbcs
408                .loaded()
409                .iter()
410                .map(|dbc| LoadedDbc {
411                    alias: dbc.alias.clone(),
412                    path: dbc.path.clone(),
413                })
414                .collect(),
415            trace_path: self
416                .trace
417                .as_ref()
418                .map(|writer| writer.path().display().to_string()),
419            periodic_schedules,
420            backend_error: self.backend_error.clone(),
421            retention_window_secs: RETENTION_WINDOW.as_secs(),
422            retention_event_cap: RETENTION_EVENT_CAP,
423        }
424    }
425
426    fn poll_backend(&mut self) {
427        let mut error = None;
428        match self.backend.recv_all() {
429            Ok(frames) => {
430                for frame in frames {
431                    if let Err(err) = validate_frame(self.connect.fd, &frame)
432                        .and_then(|_| self.record_event(EventDirection::Rx, frame))
433                    {
434                        error.get_or_insert(err);
435                    }
436                }
437            }
438            Err(err) => error = Some(err),
439        }
440        self.backend_error = error;
441    }
442
443    fn tick_schedules(&mut self) {
444        let now = Instant::now();
445        let due = self
446            .schedules
447            .values()
448            .filter(|schedule| schedule.next_due <= now)
449            .map(|schedule| schedule.target.clone())
450            .collect::<Vec<_>>();
451        let had_due = !due.is_empty();
452        let mut error = None;
453        for target in due {
454            let Some((frame, periodicity_ms)) = self
455                .schedules
456                .get(&target)
457                .map(|schedule| (schedule.frame.clone(), schedule.periodicity_ms))
458            else {
459                continue;
460            };
461            let next_due = next_due_after(now, periodicity_ms);
462            match self.backend.send(&frame) {
463                Ok(()) => {
464                    if let Err(err) = self.record_event(EventDirection::Tx, frame) {
465                        error.get_or_insert(err);
466                    }
467                }
468                Err(err) => {
469                    error.get_or_insert(err);
470                }
471            }
472            if let Some(schedule) = self.schedules.get_mut(&target) {
473                schedule.next_due = next_due;
474            }
475        }
476        if had_due {
477            self.backend_error = error;
478        }
479    }
480
481    fn record_event(
482        &mut self,
483        direction: EventDirection,
484        frame: crate::frame::CanFrame,
485    ) -> Result<(), String> {
486        let event = ObservedEvent {
487            seq: self.next_seq,
488            unix_ms: unix_ms_now()?,
489            recorded_at: Instant::now(),
490            direction: direction.clone(),
491            frame,
492        };
493        self.next_seq += 1;
494        let trace_result = if let Some(trace) = self.trace.as_mut() {
495            trace.write_event(direction.clone(), &event.frame)
496        } else {
497            Ok(())
498        };
499        self.events.push_back(event.clone());
500        self.update_latest(&event);
501        self.trim_events();
502        trace_result
503    }
504
505    fn trim_events(&mut self) {
506        let cutoff = Instant::now()
507            .checked_sub(RETENTION_WINDOW)
508            .unwrap_or_else(Instant::now);
509        while self.events.len() > RETENTION_EVENT_CAP
510            || self
511                .events
512                .front()
513                .is_some_and(|event| event.recorded_at < cutoff)
514        {
515            if let Some(event) = self.events.pop_front() {
516                self.reconcile_trimmed_event(&event);
517            }
518        }
519    }
520
521    fn update_latest(&mut self, event: &ObservedEvent) {
522        let entry = self
523            .latest
524            .entry(FrameIdentity::from_frame(&event.frame))
525            .or_insert_with(|| LatestObservation {
526                latest_any: event.clone(),
527                latest_rx: None,
528                latest_tx: None,
529            });
530        entry.latest_any = event.clone();
531        match event.direction {
532            EventDirection::Rx => entry.latest_rx = Some(event.clone()),
533            EventDirection::Tx => entry.latest_tx = Some(event.clone()),
534        }
535    }
536
537    fn reconcile_trimmed_event(&mut self, event: &ObservedEvent) {
538        let identity = FrameIdentity::from_frame(&event.frame);
539        let Some(latest) = self.latest.get(&identity) else {
540            return;
541        };
542        let touched_latest = latest.latest_any.seq == event.seq
543            || latest
544                .latest_rx
545                .as_ref()
546                .is_some_and(|latest_rx| latest_rx.seq == event.seq)
547            || latest
548                .latest_tx
549                .as_ref()
550                .is_some_and(|latest_tx| latest_tx.seq == event.seq);
551        if touched_latest {
552            self.rebuild_latest_for(identity);
553        }
554    }
555
556    fn rebuild_latest_for(&mut self, identity: FrameIdentity) {
557        let mut latest_any = None;
558        let mut latest_rx = None;
559        let mut latest_tx = None;
560
561        for event in self.events.iter().rev() {
562            if FrameIdentity::from_frame(&event.frame) != identity {
563                continue;
564            }
565            if latest_any.is_none() {
566                latest_any = Some(event.clone());
567            }
568            match event.direction {
569                EventDirection::Rx if latest_rx.is_none() => latest_rx = Some(event.clone()),
570                EventDirection::Tx if latest_tx.is_none() => latest_tx = Some(event.clone()),
571                _ => {}
572            }
573            if latest_any.is_some() && latest_rx.is_some() && latest_tx.is_some() {
574                break;
575            }
576        }
577
578        if let Some(latest_any) = latest_any {
579            self.latest.insert(
580                identity,
581                LatestObservation {
582                    latest_any,
583                    latest_rx,
584                    latest_tx,
585                },
586            );
587        } else {
588            self.latest.remove(&identity);
589        }
590    }
591}
592
593pub async fn run_listener(
594    socket_path: PathBuf,
595    config: DaemonConfig,
596) -> Result<(), std::io::Error> {
597    if let Some(parent) = socket_path.parent() {
598        std::fs::create_dir_all(parent)?;
599    }
600
601    let socket = CanSocket::open(
602        &config.connect.adapter,
603        config.connect.bitrate,
604        config.connect.bitrate_data.unwrap_or(0),
605        config.connect.fd,
606    )
607    .map_err(std::io::Error::other)?;
608    let dbcs = DbcRegistry::load(&config.connect.dbcs).map_err(std::io::Error::other)?;
609    let mut listener = ipc::bind_listener(&socket_path).await?;
610    ipc::create_endpoint_marker(&socket_path)?;
611    std::fs::write(
612        crate::daemon::lifecycle::pid_path(),
613        std::process::id().to_string(),
614    )?;
615
616    let state = SessionService::new(config.connect, dbcs, HardwareBackend { socket });
617
618    let (action_tx, action_rx) = mpsc::channel::<ActionMessage>(256);
619    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
620    let actor_task = tokio::spawn(run_actor_task(state, action_rx, shutdown_tx));
621
622    let mut listener_error = None;
623    loop {
624        tokio::select! {
625            changed = shutdown_rx.changed() => {
626                match changed {
627                    Ok(()) if *shutdown_rx.borrow() => break,
628                    Ok(()) => {}
629                    Err(_) => break,
630                }
631            }
632            accepted = listener.accept() => {
633                match accepted {
634                    Ok(stream) => {
635                        let action_tx = action_tx.clone();
636                        tokio::spawn(async move {
637                            let _ = handle_connection(stream, action_tx).await;
638                        });
639                    }
640                    Err(err) => {
641                        listener_error = Some(err);
642                        break;
643                    }
644                }
645            }
646        }
647    }
648
649    drop(action_tx);
650    let _ = actor_task.await;
651    ipc::cleanup_endpoint(&socket_path);
652    let pid_path = crate::daemon::lifecycle::pid_path();
653    if pid_path.exists() {
654        let _ = std::fs::remove_file(pid_path);
655    }
656
657    if let Some(err) = listener_error {
658        return Err(err);
659    }
660    Ok(())
661}
662
663async fn handle_connection(
664    stream: BoxedLocalStream,
665    action_tx: mpsc::Sender<ActionMessage>,
666) -> Result<(), std::io::Error> {
667    let (read_half, mut write_half) = split(stream);
668    let mut reader = BufReader::new(read_half);
669    let mut line = Vec::new();
670
671    loop {
672        let read = read_request_line(&mut reader, &mut line).await?;
673        if read == 0 {
674            return Ok(());
675        }
676        let line = std::str::from_utf8(&line)
677            .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?;
678        let response = match serde_json::from_str::<Request>(line.trim_end()) {
679            Ok(request) => {
680                let request_id = request.id;
681                let (response_tx, response_rx) = oneshot::channel();
682                if action_tx
683                    .send(ActionMessage {
684                        request,
685                        response_tx,
686                    })
687                    .await
688                    .is_err()
689                {
690                    Response::err(request_id, "daemon unavailable")
691                } else {
692                    match response_rx.await {
693                        Ok(response) => response,
694                        Err(_) => Response::err(request_id, "daemon unavailable"),
695                    }
696                }
697            }
698            Err(err) => Response::err(uuid::Uuid::new_v4(), format!("invalid request json: {err}")),
699        };
700        let mut payload = serde_json::to_string(&response).unwrap_or_else(|err| {
701            format!("{{\"success\":false,\"error\":\"response serialization failed: {err}\"}}")
702        });
703        payload.push('\n');
704        write_half.write_all(payload.as_bytes()).await?;
705    }
706}
707
708async fn read_request_line<R>(
709    reader: &mut BufReader<R>,
710    line: &mut Vec<u8>,
711) -> Result<usize, std::io::Error>
712where
713    R: tokio::io::AsyncRead + Unpin,
714{
715    line.clear();
716    loop {
717        let available = reader.fill_buf().await?;
718        if available.is_empty() {
719            return Ok(line.len());
720        }
721
722        let take = available
723            .iter()
724            .position(|byte| *byte == b'\n')
725            .map(|index| index + 1)
726            .unwrap_or(available.len());
727        if line.len() + take > MAX_REQUEST_LINE_BYTES {
728            return Err(std::io::Error::new(
729                std::io::ErrorKind::InvalidData,
730                format!("request line exceeds {MAX_REQUEST_LINE_BYTES} bytes"),
731            ));
732        }
733        line.extend_from_slice(&available[..take]);
734        reader.consume(take);
735        if line.last() == Some(&b'\n') {
736            return Ok(line.len());
737        }
738    }
739}
740
741async fn run_actor_task<B: Backend>(
742    mut state: SessionService<B>,
743    mut action_rx: mpsc::Receiver<ActionMessage>,
744    shutdown_tx: watch::Sender<bool>,
745) {
746    let mut poller = tokio::time::interval(POLL_INTERVAL);
747    poller.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
748
749    loop {
750        tokio::select! {
751            maybe_message = action_rx.recv() => match maybe_message {
752                Some(message) => {
753                    handle_action_message(&mut state, message);
754                    process_action_batch(&mut action_rx, &mut state, MAX_ACTIONS_PER_TURN.saturating_sub(1));
755                }
756                None => break,
757            },
758            _ = poller.tick() => state.tick(),
759        }
760
761        if state.should_shutdown() {
762            let _ = shutdown_tx.send(true);
763            break;
764        }
765    }
766}
767
768fn handle_action_message<B: Backend>(state: &mut SessionService<B>, message: ActionMessage) {
769    let response = state.handle_request(message.request);
770    let _ = message.response_tx.send(response);
771}
772
773fn process_action_batch<B: Backend>(
774    action_rx: &mut mpsc::Receiver<ActionMessage>,
775    state: &mut SessionService<B>,
776    limit: usize,
777) {
778    for _ in 0..limit {
779        let Some(message) = action_rx.try_recv().ok() else {
780            break;
781        };
782        handle_action_message(state, message);
783    }
784}
785
786fn next_due_after(now: Instant, periodicity_ms: u64) -> Instant {
787    now + Duration::from_millis(periodicity_ms.max(1))
788}
789
790fn connect_identity(connect: &ConnectRequest) -> ConnectRequest {
791    let mut normalized = connect.clone();
792    normalized.dbcs.sort();
793    normalized
794}
795
796fn include_direction(event: &ObservedEvent, include_tx: bool) -> bool {
797    include_tx || matches!(event.direction, EventDirection::Rx)
798}
799
800fn raw_observation(event: &ObservedEvent) -> MessageObservation {
801    MessageObservation::Raw {
802        seq: event.seq,
803        direction: event.direction.clone(),
804        unix_ms: event.unix_ms,
805        arb_id: event.frame.arb_id,
806        extended: (event.frame.flags & crate::frame::CAN_FLAG_EXTENDED) != 0,
807        fd: (event.frame.flags & crate::frame::CAN_FLAG_FD) != 0,
808        len: event.frame.len,
809        payload_hex: payload_to_hex(event.frame.payload()),
810    }
811}
812
813fn semantic_observation(
814    event: &ObservedEvent,
815    decoded: Result<crate::can::dbc::DecodedSemanticMessage, String>,
816) -> Result<MessageObservation, String> {
817    let decoded = decoded?;
818    Ok(MessageObservation::Semantic {
819        seq: event.seq,
820        direction: event.direction.clone(),
821        unix_ms: event.unix_ms,
822        qualified_name: decoded.qualified_name,
823        arb_id: decoded.arb_id,
824        extended: decoded.extended,
825        fd: decoded.fd,
826        len: decoded.len,
827        payload_hex: payload_to_hex(event.frame.payload()),
828        signals: decoded.signals,
829    })
830}
831
832fn unix_ms_now() -> Result<u128, String> {
833    Ok(SystemTime::now()
834        .duration_since(SystemTime::UNIX_EPOCH)
835        .map_err(|err| format!("system clock error: {err}"))?
836        .as_millis())
837}
838
839#[cfg(test)]
840mod tests {
841    use super::{
842        ActiveTrace, Backend, EventDirection, MAX_REQUEST_LINE_BYTES, MessageListRequest,
843        RETENTION_WINDOW, Request, RequestAction, ResponseData, SessionService, TraceStartRequest,
844        read_request_line,
845    };
846    use crate::can::dbc::DbcRegistry;
847    use crate::frame::CanFrame;
848    use crate::protocol::{
849        ConnectRequest, DbcSpec, MessagePayload, MessageReadRequest, MessageSendRequest,
850        MessageStopRequest,
851    };
852    use std::collections::{BTreeMap, VecDeque};
853    use std::io::Write;
854    use std::path::{Path, PathBuf};
855    use std::time::{Duration, Instant};
856    use tokio::io::{AsyncWriteExt, BufReader, duplex};
857    use uuid::Uuid;
858
859    #[derive(Debug, Default)]
860    struct MockBackend {
861        recv_results: VecDeque<Result<Vec<CanFrame>, String>>,
862        send_results: VecDeque<Result<(), String>>,
863        sent: Vec<CanFrame>,
864    }
865
866    impl Backend for MockBackend {
867        fn recv_all(&mut self) -> Result<Vec<CanFrame>, String> {
868            self.recv_results
869                .pop_front()
870                .unwrap_or_else(|| Ok(Vec::new()))
871        }
872
873        fn send(&mut self, frame: &CanFrame) -> Result<(), String> {
874            if let Some(result) = self.send_results.pop_front() {
875                return result.map(|()| self.sent.push(frame.clone()));
876            }
877            self.sent.push(frame.clone());
878            Ok(())
879        }
880    }
881
882    struct FailingTrace {
883        path: PathBuf,
884        finish_error: Option<String>,
885        write_error: Option<String>,
886    }
887
888    impl FailingTrace {
889        fn on_finish(message: &str) -> Self {
890            Self {
891                path: PathBuf::from("/tmp/failing.asc"),
892                finish_error: Some(message.to_string()),
893                write_error: None,
894            }
895        }
896
897        fn on_write(message: &str) -> Self {
898            Self {
899                path: PathBuf::from("/tmp/failing.asc"),
900                finish_error: None,
901                write_error: Some(message.to_string()),
902            }
903        }
904    }
905
906    impl ActiveTrace for FailingTrace {
907        fn path(&self) -> &Path {
908            &self.path
909        }
910
911        fn write_event(
912            &mut self,
913            _direction: EventDirection,
914            _frame: &CanFrame,
915        ) -> Result<(), String> {
916            if let Some(err) = &self.write_error {
917                return Err(err.clone());
918            }
919            Ok(())
920        }
921
922        fn finish(self: Box<Self>) -> Result<PathBuf, String> {
923            if let Some(err) = self.finish_error {
924                return Err(err);
925            }
926            Ok(self.path)
927        }
928    }
929
930    fn write_temp_dbc(contents: &str) -> tempfile::NamedTempFile {
931        let mut file = tempfile::NamedTempFile::new().expect("tempfile");
932        write!(file, "{contents}").expect("write dbc");
933        file
934    }
935
936    fn sample_service(overlap: bool) -> SessionService<MockBackend> {
937        let dbc_a = write_temp_dbc(
938            "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n",
939        );
940        let dbc_b = write_temp_dbc(
941            "VERSION \"\"\nBO_ 291 Bar: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|255] \"\" ECU\n",
942        );
943        let dbcs = if overlap {
944            vec![
945                DbcSpec {
946                    alias: "a".to_string(),
947                    path: dbc_a.path().display().to_string(),
948                },
949                DbcSpec {
950                    alias: "b".to_string(),
951                    path: dbc_b.path().display().to_string(),
952                },
953            ]
954        } else {
955            vec![DbcSpec {
956                alias: "a".to_string(),
957                path: dbc_a.path().display().to_string(),
958            }]
959        };
960        let registry = DbcRegistry::load(&dbcs).expect("registry");
961        SessionService::new(
962            ConnectRequest {
963                adapter: "usb1".to_string(),
964                bitrate: 500_000,
965                bitrate_data: None,
966                fd: false,
967                dbcs,
968            },
969            registry,
970            MockBackend::default(),
971        )
972    }
973
974    fn empty_service() -> SessionService<MockBackend> {
975        SessionService::new(
976            ConnectRequest {
977                adapter: "usb1".to_string(),
978                bitrate: 500_000,
979                bitrate_data: None,
980                fd: false,
981                dbcs: Vec::new(),
982            },
983            DbcRegistry::empty(),
984            MockBackend::default(),
985        )
986    }
987
988    fn raw_frame(arb_id: u32, first_byte: u8) -> CanFrame {
989        raw_frame_with_len(arb_id, first_byte, 8)
990    }
991
992    fn raw_frame_with_len(arb_id: u32, first_byte: u8, len: u8) -> CanFrame {
993        let mut data = [0_u8; 64];
994        data[0] = first_byte;
995        CanFrame {
996            arb_id,
997            len,
998            flags: 0,
999            data,
1000        }
1001    }
1002
1003    #[test]
1004    fn connect_same_session_attaches_and_different_session_fails() {
1005        let service = sample_service(false);
1006        let same = service
1007            .handle_connect(service.connect.clone())
1008            .expect("same config must attach");
1009        assert!(same.already_connected);
1010
1011        let err = service
1012            .handle_connect(ConnectRequest {
1013                adapter: "usb2".to_string(),
1014                ..service.connect.clone()
1015            })
1016            .expect_err("different config must fail");
1017        assert!(err.contains("disconnect first"));
1018    }
1019
1020    #[test]
1021    fn connect_succeeds_with_zero_dbcs_loaded() {
1022        let service = empty_service();
1023        let same = service
1024            .handle_connect(service.connect.clone())
1025            .expect("same config must attach");
1026        assert!(same.already_connected);
1027        assert!(same.status.dbcs.is_empty());
1028    }
1029
1030    #[test]
1031    fn schema_reports_semantic_inventory_while_message_list_stays_empty_without_traffic() {
1032        let mut service = sample_service(false);
1033
1034        let ResponseData::Schema { messages } = service
1035            .handle_request(Request {
1036                id: Uuid::new_v4(),
1037                action: RequestAction::Schema(crate::protocol::SchemaRequest { filter: None }),
1038            })
1039            .data
1040            .expect("schema data")
1041        else {
1042            panic!("wrong schema response");
1043        };
1044        assert_eq!(messages.len(), 1);
1045        assert_eq!(messages[0].qualified_name, "a.Foo");
1046
1047        let ResponseData::MessageList { messages } = service
1048            .handle_message_list(MessageListRequest {
1049                filter: None,
1050                allow_raw: false,
1051                include_tx: false,
1052            })
1053            .expect("message list")
1054        else {
1055            panic!("wrong message list response");
1056        };
1057        assert!(messages.is_empty());
1058    }
1059
1060    #[test]
1061    fn message_list_splits_overlap_entries() {
1062        let mut service = sample_service(true);
1063        service
1064            .record_event(EventDirection::Rx, raw_frame(291, 1))
1065            .expect("record");
1066        let data = service
1067            .handle_message_list(MessageListRequest {
1068                filter: None,
1069                allow_raw: false,
1070                include_tx: false,
1071            })
1072            .expect("message list");
1073        let ResponseData::MessageList { messages } = data else {
1074            panic!("wrong response");
1075        };
1076        assert_eq!(messages.len(), 2);
1077        assert_eq!(messages[0].label, "a.Foo");
1078        assert_eq!(messages[1].label, "b.Bar");
1079    }
1080
1081    #[test]
1082    fn message_list_excluding_tx_uses_latest_rx_observation() {
1083        let mut service = sample_service(false);
1084        service
1085            .record_event(EventDirection::Rx, raw_frame_with_len(0x123, 1, 8))
1086            .expect("record rx");
1087        service
1088            .record_event(EventDirection::Tx, raw_frame_with_len(0x123, 2, 1))
1089            .expect("record tx");
1090
1091        let ResponseData::MessageList { messages } = service
1092            .handle_message_list(MessageListRequest {
1093                filter: None,
1094                allow_raw: false,
1095                include_tx: false,
1096            })
1097            .expect("message list")
1098        else {
1099            panic!("wrong response");
1100        };
1101
1102        assert_eq!(messages.len(), 1);
1103        assert_eq!(messages[0].len, 8);
1104        assert!(messages[0].has_rx);
1105        assert!(messages[0].has_tx);
1106    }
1107
1108    #[test]
1109    fn message_read_decodes_selected_alias() {
1110        let mut service = sample_service(true);
1111        service
1112            .record_event(EventDirection::Rx, raw_frame(291, 7))
1113            .expect("record");
1114        let data = service
1115            .handle_message_read(MessageReadRequest {
1116                select: "b.Bar".to_string(),
1117                count: None,
1118                include_tx: false,
1119            })
1120            .expect("message read");
1121        let ResponseData::MessageRead(result) = data else {
1122            panic!("wrong response");
1123        };
1124        let crate::protocol::MessageObservation::Semantic { qualified_name, .. } =
1125            &result.observations[0]
1126        else {
1127            panic!("expected semantic observation");
1128        };
1129        assert_eq!(qualified_name, "b.Bar");
1130    }
1131
1132    #[test]
1133    fn semantic_send_requires_full_signal_map_and_periodic_stop_works() {
1134        let dbc = write_temp_dbc(
1135            "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n SG_ torque : 8|8@1+ (1,0) [0|255] \"\" ECU\n",
1136        );
1137        let dbcs = vec![DbcSpec {
1138            alias: "a".to_string(),
1139            path: dbc.path().display().to_string(),
1140        }];
1141        let registry = DbcRegistry::load(&dbcs).expect("registry");
1142        let mut service = SessionService::new(
1143            ConnectRequest {
1144                adapter: "usb1".to_string(),
1145                bitrate: 500_000,
1146                bitrate_data: None,
1147                fd: false,
1148                dbcs,
1149            },
1150            registry,
1151            MockBackend::default(),
1152        );
1153
1154        let mut incomplete = BTreeMap::new();
1155        incomplete.insert("enable".to_string(), 1.0);
1156        let err = service
1157            .handle_message_send(MessageSendRequest {
1158                target: "a.Foo".to_string(),
1159                data: MessagePayload::Signals(incomplete),
1160                periodicity_ms: None,
1161            })
1162            .expect_err("missing signal");
1163        assert!(err.contains("missing required signal 'torque'"));
1164
1165        let data = service
1166            .handle_message_send(MessageSendRequest {
1167                target: "0x123".to_string(),
1168                data: MessagePayload::RawHex("DEADBEEF".to_string()),
1169                periodicity_ms: Some(100),
1170            })
1171            .expect("schedule");
1172        let ResponseData::MessageSent(sent) = data else {
1173            panic!("wrong response");
1174        };
1175        assert_eq!(sent.target, "0x123");
1176
1177        let stopped = service
1178            .handle_message_stop(MessageStopRequest {
1179                target: "0x123".to_string(),
1180            })
1181            .expect("stop");
1182        let ResponseData::MessageStopped { stopped, .. } = stopped else {
1183            panic!("wrong response");
1184        };
1185        assert!(stopped);
1186    }
1187
1188    #[test]
1189    fn periodic_send_overwrites_existing_schedule_for_same_target() {
1190        let mut service = sample_service(false);
1191        service
1192            .handle_message_send(MessageSendRequest {
1193                target: "0x123".to_string(),
1194                data: MessagePayload::RawHex("AAAA".to_string()),
1195                periodicity_ms: Some(100),
1196            })
1197            .expect("initial schedule");
1198
1199        service
1200            .handle_message_send(MessageSendRequest {
1201                target: "0x123".to_string(),
1202                data: MessagePayload::RawHex("BBBB".to_string()),
1203                periodicity_ms: Some(250),
1204            })
1205            .expect("overwrite schedule");
1206
1207        assert_eq!(service.schedules.len(), 1);
1208        let schedule = service.schedules.get("0x123").expect("schedule");
1209        assert_eq!(schedule.periodicity_ms, 250);
1210        assert_eq!(
1211            crate::protocol::payload_to_hex(schedule.frame.payload()),
1212            "BBBB"
1213        );
1214    }
1215
1216    #[test]
1217    fn semantic_periodic_send_stores_raw_encoded_schedule_state() {
1218        let dbc = write_temp_dbc(
1219            "VERSION \"\"\nBO_ 291 Foo: 8 ECU\n SG_ enable : 0|8@1+ (1,0) [0|1] \"\" ECU\n SG_ torque : 8|8@1+ (1,0) [0|255] \"\" ECU\n",
1220        );
1221        let dbcs = vec![DbcSpec {
1222            alias: "a".to_string(),
1223            path: dbc.path().display().to_string(),
1224        }];
1225        let registry = DbcRegistry::load(&dbcs).expect("registry");
1226        let mut service = SessionService::new(
1227            ConnectRequest {
1228                adapter: "usb1".to_string(),
1229                bitrate: 500_000,
1230                bitrate_data: None,
1231                fd: false,
1232                dbcs,
1233            },
1234            registry,
1235            MockBackend::default(),
1236        );
1237
1238        let mut data = BTreeMap::new();
1239        data.insert("enable".to_string(), 1.0);
1240        data.insert("torque".to_string(), 12.0);
1241        service
1242            .handle_message_send(MessageSendRequest {
1243                target: "a.Foo".to_string(),
1244                data: MessagePayload::Signals(data),
1245                periodicity_ms: Some(100),
1246            })
1247            .expect("semantic schedule");
1248
1249        let schedule = service.schedules.get("a.Foo").expect("schedule");
1250        assert_eq!(schedule.frame.arb_id, 291);
1251        assert_eq!(schedule.frame.len, 8);
1252        assert_eq!(
1253            crate::protocol::payload_to_hex(schedule.frame.payload()),
1254            "010C000000000000"
1255        );
1256    }
1257
1258    #[test]
1259    fn message_read_returns_error_when_no_matching_traffic_exists() {
1260        let service = sample_service(false);
1261        let err = service
1262            .handle_message_read(MessageReadRequest {
1263                select: "a.Foo".to_string(),
1264                count: None,
1265                include_tx: false,
1266            })
1267            .expect_err("empty traffic must fail");
1268        assert!(err.contains("matched no observed traffic"));
1269    }
1270
1271    #[test]
1272    fn disconnect_still_shuts_down_when_trace_finish_fails() {
1273        let mut service = sample_service(false);
1274        service.trace = Some(Box::new(FailingTrace::on_finish("flush failed")));
1275        service.schedules.insert(
1276            "0x123".to_string(),
1277            super::PeriodicScheduleState {
1278                target: "0x123".to_string(),
1279                frame: raw_frame(0x123, 1),
1280                periodicity_ms: 100,
1281                next_due: Instant::now() + Duration::from_millis(100),
1282            },
1283        );
1284
1285        let response = service.handle_disconnect().expect("disconnect");
1286        assert!(matches!(response, ResponseData::Disconnected));
1287        assert!(service.should_shutdown());
1288        assert!(service.schedules.is_empty());
1289        assert_eq!(service.backend_error.as_deref(), Some("flush failed"));
1290        assert!(service.trace.is_none());
1291    }
1292
1293    #[test]
1294    fn periodic_send_failure_reschedules_and_clears_degraded_after_recovery() {
1295        let mut service = sample_service(false);
1296        let data = service
1297            .handle_message_send(MessageSendRequest {
1298                target: "0x123".to_string(),
1299                data: MessagePayload::RawHex("DEADBEEF".to_string()),
1300                periodicity_ms: Some(50),
1301            })
1302            .expect("schedule");
1303        assert!(matches!(data, ResponseData::MessageSent(_)));
1304
1305        let schedule = service.schedules.get_mut("0x123").expect("schedule state");
1306        schedule.next_due = Instant::now() - Duration::from_millis(1);
1307        service
1308            .backend
1309            .send_results
1310            .push_back(Err("tx failed".to_string()));
1311
1312        service.tick_schedules();
1313        assert_eq!(service.backend.sent.len(), 0);
1314        assert_eq!(service.backend_error.as_deref(), Some("tx failed"));
1315        let failed_next_due = service.schedules["0x123"].next_due;
1316        assert!(failed_next_due > Instant::now() - Duration::from_millis(1));
1317
1318        service.tick_schedules();
1319        assert_eq!(service.backend.sent.len(), 0);
1320        assert_eq!(service.backend_error.as_deref(), Some("tx failed"));
1321
1322        service
1323            .schedules
1324            .get_mut("0x123")
1325            .expect("schedule")
1326            .next_due = Instant::now() - Duration::from_millis(1);
1327        service.backend.send_results.push_back(Ok(()));
1328        service.tick_schedules();
1329
1330        assert_eq!(service.backend.sent.len(), 1);
1331        assert_eq!(service.backend_error, None);
1332    }
1333
1334    #[test]
1335    fn periodic_send_trace_errors_surface_without_dropping_event_state() {
1336        let mut service = sample_service(false);
1337        service.trace = Some(Box::new(FailingTrace::on_write("trace write failed")));
1338        service.schedules.insert(
1339            "0x123".to_string(),
1340            super::PeriodicScheduleState {
1341                target: "0x123".to_string(),
1342                frame: raw_frame(0x123, 1),
1343                periodicity_ms: 25,
1344                next_due: Instant::now() - Duration::from_millis(1),
1345            },
1346        );
1347
1348        service.tick_schedules();
1349
1350        assert_eq!(service.backend.sent.len(), 1);
1351        assert_eq!(service.backend_error.as_deref(), Some("trace write failed"));
1352        assert_eq!(service.events.len(), 1);
1353        assert!(
1354            service
1355                .latest
1356                .contains_key(&crate::can::dbc::FrameIdentity {
1357                    arb_id: 0x123,
1358                    extended: false,
1359                })
1360        );
1361    }
1362
1363    #[test]
1364    fn tick_trims_expired_events_and_rebuilds_latest_index() {
1365        let mut service = sample_service(false);
1366        service
1367            .record_event(EventDirection::Rx, raw_frame(0x123, 1))
1368            .expect("record");
1369        service.events[0].recorded_at = Instant::now() - RETENTION_WINDOW - Duration::from_secs(1);
1370        service.tick();
1371        assert!(service.events.is_empty());
1372        assert!(service.latest.is_empty());
1373    }
1374
1375    #[test]
1376    fn degraded_state_clears_after_successful_backend_poll() {
1377        let mut service = sample_service(false);
1378        service
1379            .backend
1380            .recv_results
1381            .push_back(Err("rx failed".to_string()));
1382        service.backend.recv_results.push_back(Ok(Vec::new()));
1383        service.tick();
1384        assert_eq!(service.status().connection_state, "degraded");
1385        service.tick();
1386        assert_eq!(service.status().connection_state, "connected");
1387    }
1388
1389    #[test]
1390    fn trace_start_stop_and_degraded_backend_preserve_status() {
1391        let mut service = sample_service(false);
1392        let tempdir = tempfile::tempdir().expect("trace dir");
1393        let path = tempdir.path().join("trace.asc").display().to_string();
1394        let started = service
1395            .handle_trace_start(TraceStartRequest { path: path.clone() })
1396            .expect("trace start");
1397        assert!(matches!(started, ResponseData::TraceStarted { .. }));
1398        let stopped = service.handle_trace_stop().expect("trace stop");
1399        assert!(matches!(stopped, ResponseData::TraceStopped { .. }));
1400    }
1401
1402    #[test]
1403    fn request_round_trip_through_service_serializes_current_verbs() {
1404        let mut service = sample_service(false);
1405        let response = service.handle_request(Request {
1406            id: Uuid::new_v4(),
1407            action: RequestAction::Status,
1408        });
1409        assert!(response.success);
1410    }
1411
1412    #[tokio::test]
1413    async fn read_request_line_rejects_oversized_requests() {
1414        let (mut writer, reader) = duplex(1024);
1415        let writer_task = tokio::spawn(async move {
1416            let oversized = vec![b'a'; MAX_REQUEST_LINE_BYTES + 1];
1417            let _ = writer.write_all(&oversized).await;
1418        });
1419
1420        let mut reader = BufReader::new(reader);
1421        let mut line = Vec::new();
1422        let err = read_request_line(&mut reader, &mut line)
1423            .await
1424            .expect_err("oversized line must fail");
1425        assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
1426        drop(reader);
1427        writer_task.await.expect("writer task");
1428    }
1429
1430    #[test]
1431    fn next_due_after_never_reuses_due_now_deadline() {
1432        let now = Instant::now();
1433        assert!(super::next_due_after(now, 0) >= now + Duration::from_millis(1));
1434        assert!(super::next_due_after(now, 25) >= now + Duration::from_millis(25));
1435    }
1436}