memfault_ssf/
shared_service_thread.rs1use std::{
5 any::TypeId,
6 borrow::BorrowMut,
7 sync::{mpsc::Receiver, Arc, Mutex},
8 thread::spawn,
9};
10
11use crate::{Envelope, Mailbox, Service, ShutdownServiceMessage, StatsAggregator};
12
13pub struct SharedServiceThread<S: Service> {
21 mailbox: Mailbox<S>,
22 service: Arc<Mutex<S>>,
23}
24
25impl<S: Service + Send + 'static> SharedServiceThread<S> {
26 pub fn spawn_with(service: S) -> Self {
27 let (mailbox, receiver) = Mailbox::create();
28 let shared_service = Arc::new(Mutex::new(service));
29 {
30 let shared_service = shared_service.clone();
31 let _handle = spawn(move || SharedServiceThread::run(shared_service, receiver));
32 }
33
34 SharedServiceThread {
35 mailbox,
36 service: shared_service,
37 }
38 }
39
40 pub fn mbox(&self) -> Mailbox<S> {
41 self.mailbox.clone()
42 }
43
44 pub fn run(
45 service: Arc<Mutex<S>>,
46 receiver: Receiver<Envelope<S>>,
47 ) -> Result<StatsAggregator, &'static str> {
48 let mut stats_aggregator = StatsAggregator::new();
49 for mut envelope in receiver {
50 let type_id = envelope.message_type_id();
51 match service.lock().borrow_mut() {
52 Ok(service) => match envelope.deliver_to(service) {
53 Err(_e) => {
54 return Err("delivery failed");
56 }
57 Ok(stats) => {
58 stats_aggregator.add(&stats);
59 }
60 },
61 Err(_) => {
62 return Err("Shared mutex is poisoned. Shutting down.");
63 }
64 }
65 if type_id == Some(TypeId::of::<ShutdownServiceMessage>()) {
66 break;
67 }
68 }
69 Ok(stats_aggregator)
70 }
71
72 pub fn shared(&self) -> Arc<Mutex<S>> {
73 self.service.clone()
74 }
75}