eva_common/
services.rs

1use crate::registry;
2use crate::Value;
3use crate::{EResult, Error};
4use busrt::rpc::{self, RpcClient, RpcHandlers};
5#[cfg(all(feature = "openssl3", feature = "fips"))]
6use once_cell::sync::OnceCell;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::ffi::CString;
10use std::fmt;
11#[cfg(feature = "extended-value")]
12use std::path::Path;
13use std::sync::atomic;
14use std::sync::Arc;
15use std::time::Duration;
16
17pub const SERVICE_CONFIG_VERSION: u16 = 4;
18
19pub const SERVICE_PAYLOAD_PING: u8 = 0;
20pub const SERVICE_PAYLOAD_INITIAL: u8 = 1;
21
22#[cfg(all(feature = "openssl3", feature = "fips"))]
23#[allow(dead_code)]
24static FIPS_LOADED: OnceCell<()> = OnceCell::new();
25
26#[cfg(any(
27    feature = "openssl-vendored",
28    feature = "openssl-no-fips",
29    not(feature = "fips")
30))]
31pub fn enable_fips() -> EResult<()> {
32    Err(Error::failed(
33        "FIPS can not be enabled, consider using a native OS distribution",
34    ))
35}
36
37#[cfg(not(any(feature = "openssl-vendored", feature = "openssl-no-fips")))]
38#[cfg(feature = "fips")]
39pub fn enable_fips() -> EResult<()> {
40    #[cfg(feature = "openssl3")]
41    {
42        FIPS_LOADED
43            .set(())
44            .map_err(|_| Error::core("FIPS provided already loaded"))?;
45        std::mem::forget(openssl::provider::Provider::load(None, "fips")?);
46    }
47    #[cfg(not(feature = "openssl3"))]
48    openssl::fips::enable(true)?;
49    Ok(())
50}
51
52pub struct Registry {
53    id: String,
54    rpc: Arc<RpcClient>,
55}
56
57impl Registry {
58    #[inline]
59    pub async fn key_set<V>(&self, key: &str, value: V) -> EResult<Value>
60    where
61        V: Serialize,
62    {
63        registry::key_set(
64            &registry::format_svc_data_subkey(&self.id),
65            key,
66            value,
67            &self.rpc,
68        )
69        .await
70    }
71    #[inline]
72    pub async fn key_get(&self, key: &str) -> EResult<Value> {
73        registry::key_get(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
74    }
75    #[inline]
76    pub async fn key_userdata_get(&self, key: &str) -> EResult<Value> {
77        registry::key_get(registry::R_USER_DATA, key, &self.rpc).await
78    }
79    #[inline]
80    pub async fn key_increment(&self, key: &str) -> EResult<i64> {
81        registry::key_increment(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
82    }
83
84    #[inline]
85    pub async fn key_decrement(&self, key: &str) -> EResult<i64> {
86        registry::key_decrement(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
87    }
88    #[inline]
89    pub async fn key_get_recursive(&self, key: &str) -> EResult<Vec<(String, Value)>> {
90        registry::key_get_recursive(&registry::format_svc_data_subkey(&self.id), key, &self.rpc)
91            .await
92    }
93    #[inline]
94    pub async fn key_delete(&self, key: &str) -> EResult<Value> {
95        registry::key_delete(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
96    }
97    #[inline]
98    pub async fn key_delete_recursive(&self, key: &str) -> EResult<Value> {
99        registry::key_delete_recursive(&registry::format_svc_data_subkey(&self.id), key, &self.rpc)
100            .await
101    }
102}
103
104#[inline]
105fn default_workers() -> u32 {
106    1
107}
108
109#[derive(Default, Clone, Debug, Serialize, Deserialize)]
110pub struct RealtimeConfig {
111    #[serde(default)]
112    pub priority: Option<i32>,
113    #[serde(default)]
114    pub cpu_ids: Vec<usize>,
115    #[serde(default)]
116    pub prealloc_heap: Option<usize>,
117}
118
119/// Initial properties for services
120#[derive(Debug, Serialize, Deserialize)]
121pub struct Initial {
122    #[serde(rename = "version")]
123    config_version: u16,
124    system_name: String,
125    id: String,
126    command: String,
127    #[serde(default)]
128    prepare_command: Option<String>,
129    data_path: String,
130    timeout: Timeout,
131    core: CoreInfo,
132    bus: BusConfig,
133    #[serde(default)]
134    realtime: RealtimeConfig,
135    #[serde(default)]
136    config: Option<Value>,
137    #[serde(default = "default_workers")]
138    workers: u32,
139    #[serde(default)]
140    user: Option<String>,
141    #[serde(default)]
142    react_to_fail: bool,
143    #[serde(
144        serialize_with = "crate::tools::serialize_atomic_bool",
145        deserialize_with = "crate::tools::deserialize_atomic_bool"
146    )]
147    fail_mode: atomic::AtomicBool,
148    #[serde(default)]
149    fips: bool,
150    #[serde(default)]
151    call_tracing: bool,
152}
153
154impl Initial {
155    #[allow(clippy::too_many_arguments)]
156    pub fn new(
157        id: &str,
158        system_name: &str,
159        command: &str,
160        prepare_command: Option<&str>,
161        data_path: &str,
162        timeout: &Timeout,
163        core_info: CoreInfo,
164        bus: BusConfig,
165        config: Option<&Value>,
166        workers: u32,
167        user: Option<&str>,
168        react_to_fail: bool,
169        fips: bool,
170        call_tracing: bool,
171    ) -> Self {
172        Self {
173            config_version: SERVICE_CONFIG_VERSION,
174            system_name: system_name.to_owned(),
175            id: id.to_owned(),
176            command: command.to_owned(),
177            prepare_command: prepare_command.map(ToOwned::to_owned),
178            data_path: data_path.to_owned(),
179            timeout: timeout.clone(),
180            core: core_info,
181            bus,
182            realtime: <_>::default(),
183            config: config.cloned(),
184            workers,
185            user: user.map(ToOwned::to_owned),
186            react_to_fail,
187            fail_mode: atomic::AtomicBool::new(false),
188            fips,
189            call_tracing,
190        }
191    }
192    pub fn with_realtime(mut self, realtime: RealtimeConfig) -> Self {
193        self.realtime = realtime;
194        self
195    }
196    #[inline]
197    pub fn init(&self) -> EResult<()> {
198        #[cfg(feature = "openssl-no-fips")]
199        if self.fips {
200            return Err(Error::not_implemented(
201                "no FIPS 140 support, disable FIPS or switch to native package",
202            ));
203        }
204        if self.fips {
205            enable_fips()?;
206        }
207        Ok(())
208    }
209    #[inline]
210    pub fn config_version(&self) -> u16 {
211        self.config_version
212    }
213    #[inline]
214    pub fn system_name(&self) -> &str {
215        &self.system_name
216    }
217    #[inline]
218    pub fn id(&self) -> &str {
219        &self.id
220    }
221    #[inline]
222    pub fn command(&self) -> &str {
223        &self.command
224    }
225    pub fn realtime(&self) -> &RealtimeConfig {
226        &self.realtime
227    }
228    #[inline]
229    pub fn prepare_command(&self) -> Option<&str> {
230        self.prepare_command.as_deref()
231    }
232    #[inline]
233    pub fn user(&self) -> Option<&str> {
234        self.user.as_deref()
235    }
236    pub fn set_user(&mut self, user: Option<&str>) {
237        self.user = user.map(ToOwned::to_owned);
238    }
239    pub fn set_id(&mut self, id: &str) {
240        id.clone_into(&mut self.id);
241    }
242    #[inline]
243    pub fn data_path(&self) -> Option<&str> {
244        if let Some(ref user) = self.user {
245            if user == "nobody" {
246                return None;
247            }
248        }
249        Some(&self.data_path)
250    }
251    #[inline]
252    pub fn planned_data_path(&self) -> &str {
253        &self.data_path
254    }
255    pub fn set_data_path(&mut self, path: &str) {
256        path.clone_into(&mut self.data_path);
257    }
258    #[inline]
259    pub fn timeout(&self) -> Duration {
260        self.timeout
261            .default
262            .map_or(crate::DEFAULT_TIMEOUT, Duration::from_secs_f64)
263    }
264    #[inline]
265    pub fn startup_timeout(&self) -> Duration {
266        self.timeout
267            .startup
268            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
269    }
270    #[inline]
271    pub fn shutdown_timeout(&self) -> Duration {
272        self.timeout
273            .shutdown
274            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
275    }
276    #[inline]
277    pub fn bus_timeout(&self) -> Duration {
278        self.bus
279            .timeout
280            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
281    }
282    #[inline]
283    pub fn eva_build(&self) -> u64 {
284        self.core.build
285    }
286    #[inline]
287    pub fn eva_version(&self) -> &str {
288        &self.core.version
289    }
290    #[inline]
291    pub fn eapi_version(&self) -> u16 {
292        self.core.eapi_verion
293    }
294    #[inline]
295    pub fn eva_dir(&self) -> &str {
296        &self.core.path
297    }
298    #[inline]
299    pub fn eva_log_level(&self) -> u8 {
300        self.core.log_level
301    }
302    #[inline]
303    pub fn core_active(&self) -> bool {
304        self.core.active
305    }
306    #[inline]
307    pub fn call_tracing(&self) -> bool {
308        self.call_tracing
309    }
310    #[inline]
311    pub fn eva_log_level_filter(&self) -> log::LevelFilter {
312        match self.core.log_level {
313            crate::LOG_LEVEL_TRACE => log::LevelFilter::Trace,
314            crate::LOG_LEVEL_DEBUG => log::LevelFilter::Debug,
315            crate::LOG_LEVEL_WARN => log::LevelFilter::Warn,
316            crate::LOG_LEVEL_ERROR => log::LevelFilter::Error,
317            crate::LOG_LEVEL_OFF => log::LevelFilter::Off,
318            _ => log::LevelFilter::Info,
319        }
320    }
321    #[inline]
322    pub fn bus_config(&self) -> EResult<busrt::ipc::Config> {
323        if self.bus.tp == "native" {
324            Ok(busrt::ipc::Config::new(&self.bus.path, &self.id)
325                .buf_size(self.bus.buf_size)
326                .buf_ttl(Duration::from_micros(self.bus.buf_ttl))
327                .queue_size(self.bus.queue_size)
328                .timeout(self.bus_timeout()))
329        } else {
330            Err(Error::not_implemented(format!(
331                "bus type {} is not supported",
332                self.bus.tp
333            )))
334        }
335    }
336    #[inline]
337    pub fn bus_config_for_sub(&self, sub_id: &str) -> EResult<busrt::ipc::Config> {
338        if self.bus.tp == "native" {
339            Ok(
340                busrt::ipc::Config::new(&self.bus.path, &format!("{}::{}", self.id, sub_id))
341                    .buf_size(self.bus.buf_size)
342                    .buf_ttl(Duration::from_micros(self.bus.buf_ttl))
343                    .queue_size(self.bus.queue_size)
344                    .timeout(self.bus_timeout()),
345            )
346        } else {
347            Err(Error::not_implemented(format!(
348                "bus type {} is not supported",
349                self.bus.tp
350            )))
351        }
352    }
353    pub fn set_bus_path(&mut self, path: &str) {
354        path.clone_into(&mut self.bus.path);
355    }
356    #[inline]
357    pub fn bus_path(&self) -> &str {
358        &self.bus.path
359    }
360    #[inline]
361    pub fn config(&self) -> Option<&Value> {
362        self.config.as_ref()
363    }
364    #[cfg(feature = "extended-value")]
365    #[inline]
366    pub async fn extend_config(&mut self, timeout: Duration, base: &Path) -> EResult<()> {
367        self.config = if let Some(config) = self.config.take() {
368            Some(config.extend(timeout, base).await?)
369        } else {
370            None
371        };
372        Ok(())
373    }
374    #[inline]
375    pub fn workers(&self) -> u32 {
376        self.workers
377    }
378    #[inline]
379    pub fn bus_queue_size(&self) -> usize {
380        self.bus.queue_size
381    }
382    #[inline]
383    pub fn take_config(&mut self) -> Option<Value> {
384        self.config.take()
385    }
386    #[inline]
387    pub async fn init_rpc<R>(&self, handlers: R) -> EResult<Arc<RpcClient>>
388    where
389        R: RpcHandlers + Send + Sync + 'static,
390    {
391        self.init_rpc_opts(handlers, rpc::Options::default()).await
392    }
393    #[inline]
394    pub async fn init_rpc_blocking<R>(&self, handlers: R) -> EResult<Arc<RpcClient>>
395    where
396        R: RpcHandlers + Send + Sync + 'static,
397    {
398        self.init_rpc_opts(
399            handlers,
400            rpc::Options::new()
401                .blocking_notifications()
402                .blocking_frames(),
403        )
404        .await
405    }
406    #[inline]
407    pub async fn init_rpc_blocking_with_secondary<R>(
408        &self,
409        handlers: R,
410    ) -> EResult<(Arc<RpcClient>, Arc<RpcClient>)>
411    where
412        R: RpcHandlers + Send + Sync + 'static,
413    {
414        let bus = self.init_bus_client().await?;
415        let bus_secondary = bus.register_secondary().await?;
416        let opts = rpc::Options::new()
417            .blocking_notifications()
418            .blocking_frames();
419        let rpc = Arc::new(RpcClient::create(bus, handlers, opts.clone()));
420        let rpc_secondary = Arc::new(RpcClient::create0(bus_secondary, opts));
421        Ok((rpc, rpc_secondary))
422    }
423    pub async fn init_rpc_opts<R>(&self, handlers: R, opts: rpc::Options) -> EResult<Arc<RpcClient>>
424    where
425        R: RpcHandlers + Send + Sync + 'static,
426    {
427        let bus = self.init_bus_client().await?;
428        let rpc = RpcClient::create(bus, handlers, opts);
429        Ok(Arc::new(rpc))
430    }
431    pub async fn init_bus_client(&self) -> EResult<busrt::ipc::Client> {
432        let bus = tokio::time::timeout(
433            self.bus_timeout(),
434            busrt::ipc::Client::connect(&self.bus_config()?),
435        )
436        .await??;
437        Ok(bus)
438    }
439    pub async fn init_bus_client_sub(&self, sub_id: &str) -> EResult<busrt::ipc::Client> {
440        let bus = tokio::time::timeout(
441            self.bus_timeout(),
442            busrt::ipc::Client::connect(&self.bus_config_for_sub(sub_id)?),
443        )
444        .await??;
445        Ok(bus)
446    }
447    #[inline]
448    pub fn init_registry(&self, rpc: &Arc<RpcClient>) -> Registry {
449        Registry {
450            id: self.id.clone(),
451            rpc: rpc.clone(),
452        }
453    }
454    #[inline]
455    pub fn can_rtf(&self) -> bool {
456        self.react_to_fail
457    }
458    #[inline]
459    pub fn is_mode_normal(&self) -> bool {
460        !self.fail_mode.load(atomic::Ordering::SeqCst)
461    }
462    #[inline]
463    pub fn is_mode_rtf(&self) -> bool {
464        self.fail_mode.load(atomic::Ordering::SeqCst)
465    }
466    #[inline]
467    pub fn set_fail_mode(&self, mode: bool) {
468        self.fail_mode.store(mode, atomic::Ordering::SeqCst);
469    }
470    #[cfg(not(target_os = "windows"))]
471    #[inline]
472    pub fn drop_privileges(&self) -> EResult<()> {
473        if let Some(ref user) = self.user {
474            if !user.is_empty() {
475                let u = get_system_user(user)?;
476                if nix::unistd::getuid() != u.uid {
477                    let c_user = CString::new(user.as_str()).map_err(|e| {
478                        Error::failed(format!("Failed to parse user {}: {}", user, e))
479                    })?;
480
481                    let groups = nix::unistd::getgrouplist(&c_user, u.gid).map_err(|e| {
482                        Error::failed(format!("Failed to get groups for user {}: {}", user, e))
483                    })?;
484                    nix::unistd::setgroups(&groups).map_err(|e| {
485                        Error::failed(format!(
486                            "Failed to switch the process groups for user {}: {}",
487                            user, e
488                        ))
489                    })?;
490                    nix::unistd::setgid(u.gid).map_err(|e| {
491                        Error::failed(format!(
492                            "Failed to switch the process group for user {}: {}",
493                            user, e
494                        ))
495                    })?;
496                    nix::unistd::setuid(u.uid).map_err(|e| {
497                        Error::failed(format!(
498                            "Failed to switch the process user to {}: {}",
499                            user, e
500                        ))
501                    })?;
502                }
503            }
504        }
505        Ok(())
506    }
507    pub fn into_legacy_compat(mut self) -> Self {
508        self.data_path = self.data_path().unwrap_or_default().to_owned();
509        let user = self.user.take().unwrap_or_default();
510        self.user.replace(user);
511        let timeout = self
512            .timeout
513            .default
514            .unwrap_or(crate::DEFAULT_TIMEOUT.as_secs_f64());
515        self.timeout.default.replace(timeout);
516        if self.timeout.startup.is_none() {
517            self.timeout.startup.replace(timeout);
518        }
519        if self.timeout.shutdown.is_none() {
520            self.timeout.shutdown.replace(timeout);
521        }
522        let config = self
523            .take_config()
524            .unwrap_or_else(|| Value::Map(<_>::default()));
525        self.config.replace(config);
526        self
527    }
528}
529
530#[cfg(not(target_os = "windows"))]
531pub fn get_system_user(user: &str) -> EResult<nix::unistd::User> {
532    let u = nix::unistd::User::from_name(user)
533        .map_err(|e| Error::failed(format!("failed to get the system user {}: {}", user, e)))?
534        .ok_or_else(|| Error::failed(format!("Failed to locate the system user {}", user)))?;
535    Ok(u)
536}
537
538#[cfg(not(target_os = "windows"))]
539pub fn get_system_group(group: &str) -> EResult<nix::unistd::Group> {
540    let g = nix::unistd::Group::from_name(group)
541        .map_err(|e| Error::failed(format!("failed to get the system group {}: {}", group, e)))?
542        .ok_or_else(|| Error::failed(format!("Failed to locate the system group {}", group)))?;
543    Ok(g)
544}
545
546#[derive(Debug, Serialize, Deserialize, Clone, Default)]
547pub struct Timeout {
548    startup: Option<f64>,
549    shutdown: Option<f64>,
550    default: Option<f64>,
551}
552
553impl Timeout {
554    pub fn offer(&mut self, timeout: f64) {
555        if self.startup.is_none() {
556            self.startup.replace(timeout);
557        }
558        if self.shutdown.is_none() {
559            self.shutdown.replace(timeout);
560        }
561        if self.default.is_none() {
562            self.default.replace(timeout);
563        }
564    }
565    pub fn get(&self) -> Option<Duration> {
566        self.default.map(Duration::from_secs_f64)
567    }
568    pub fn startup(&self) -> Option<Duration> {
569        self.startup.map(Duration::from_secs_f64)
570    }
571    pub fn shutdown(&self) -> Option<Duration> {
572        self.shutdown.map(Duration::from_secs_f64)
573    }
574}
575
576#[derive(Debug, Serialize, Deserialize)]
577pub struct CoreInfo {
578    build: u64,
579    version: String,
580    eapi_verion: u16,
581    path: String,
582    log_level: u8,
583    active: bool,
584}
585
586impl CoreInfo {
587    pub fn new(
588        build: u64,
589        version: &str,
590        eapi_verion: u16,
591        path: &str,
592        log_level: u8,
593        active: bool,
594    ) -> Self {
595        Self {
596            build,
597            version: version.to_owned(),
598            eapi_verion,
599            path: path.to_owned(),
600            log_level,
601            active,
602        }
603    }
604}
605
606#[inline]
607fn default_bus_type() -> String {
608    "native".to_owned()
609}
610
611#[inline]
612fn default_bus_buf_size() -> usize {
613    busrt::DEFAULT_BUF_SIZE
614}
615
616#[allow(clippy::cast_possible_truncation)]
617#[inline]
618fn default_bus_buf_ttl() -> u64 {
619    busrt::DEFAULT_BUF_TTL.as_micros() as u64
620}
621
622#[inline]
623fn default_bus_queue_size() -> usize {
624    busrt::DEFAULT_QUEUE_SIZE
625}
626
627#[derive(Debug, Clone, Deserialize, Serialize)]
628pub struct BusConfig {
629    #[serde(rename = "type", default = "default_bus_type")]
630    tp: String,
631    path: String,
632    timeout: Option<f64>,
633    #[serde(default = "default_bus_buf_size")]
634    buf_size: usize,
635    #[serde(default = "default_bus_buf_ttl")]
636    buf_ttl: u64, // microseconds
637    #[serde(default = "default_bus_queue_size")]
638    queue_size: usize,
639    // deprecated field, as BUS/RT RPC uses timeout as a ping interval
640    #[serde(rename = "ping_interval", skip_serializing, default)]
641    _ping_interval: f64,
642}
643
644impl BusConfig {
645    pub fn path(&self) -> &str {
646        &self.path
647    }
648    pub fn set_path(&mut self, path: &str) {
649        path.clone_into(&mut self.path);
650    }
651    pub fn offer_timeout(&mut self, timeout: f64) {
652        if self.timeout.is_none() {
653            self.timeout.replace(timeout);
654        }
655    }
656}
657
658#[derive(Debug, Clone, Serialize, Deserialize)]
659pub struct MethodParamInfo {
660    #[serde(default)]
661    pub required: bool,
662}
663
664#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct MethodInfo {
666    #[serde(default)]
667    pub description: String,
668    pub params: HashMap<String, MethodParamInfo>,
669}
670
671/// info-structure only, can be used by clients for auto-completion
672pub struct ServiceMethod {
673    pub name: String,
674    pub description: String,
675    pub params: HashMap<String, MethodParamInfo>,
676}
677
678impl ServiceMethod {
679    pub fn new(name: &str) -> Self {
680        Self {
681            name: name.to_owned(),
682            description: String::new(),
683            params: <_>::default(),
684        }
685    }
686    pub fn description(mut self, desc: &str) -> Self {
687        desc.clone_into(&mut self.description);
688        self
689    }
690    pub fn required(mut self, name: &str) -> Self {
691        self.params
692            .insert(name.to_owned(), MethodParamInfo { required: true });
693        self
694    }
695    pub fn optional(mut self, name: &str) -> Self {
696        self.params
697            .insert(name.to_owned(), MethodParamInfo { required: false });
698        self
699    }
700}
701
702/// Returned by all services on "info" RPC command
703#[derive(Serialize, Deserialize, Debug, Clone)]
704pub struct ServiceInfo {
705    #[serde(default)]
706    pub author: String,
707    #[serde(default)]
708    pub version: String,
709    #[serde(default)]
710    pub description: String,
711    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
712    pub methods: HashMap<String, MethodInfo>,
713}
714
715impl ServiceInfo {
716    pub fn new(author: &str, version: &str, description: &str) -> Self {
717        Self {
718            author: author.to_owned(),
719            version: version.to_owned(),
720            description: description.to_owned(),
721            methods: <_>::default(),
722        }
723    }
724    #[inline]
725    pub fn add_method(&mut self, method: ServiceMethod) {
726        self.methods.insert(
727            method.name,
728            MethodInfo {
729                description: method.description,
730                params: method.params,
731            },
732        );
733    }
734}
735
736/// Used by services to announce their status (for "*")
737#[derive(Serialize, Deserialize)]
738pub struct ServiceStatusBroadcastEvent {
739    pub status: ServiceStatusBroadcast,
740}
741
742impl ServiceStatusBroadcastEvent {
743    #[inline]
744    pub fn ready() -> Self {
745        Self {
746            status: ServiceStatusBroadcast::Ready,
747        }
748    }
749    #[inline]
750    pub fn terminating() -> Self {
751        Self {
752            status: ServiceStatusBroadcast::Terminating,
753        }
754    }
755}
756
757/// Used by services and the core to notify about its state
758#[derive(Serialize, Deserialize)]
759#[serde(rename_all = "lowercase")]
760#[repr(u8)]
761pub enum ServiceStatusBroadcast {
762    Starting = 0,
763    Ready = 1,
764    Terminating = 0xef,
765    Unknown = 0xff,
766}
767
768impl fmt::Display for ServiceStatusBroadcast {
769    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
770        write!(
771            f,
772            "{}",
773            match self {
774                ServiceStatusBroadcast::Starting => "starting",
775                ServiceStatusBroadcast::Ready => "ready",
776                ServiceStatusBroadcast::Terminating => "terminating",
777                ServiceStatusBroadcast::Unknown => "unknown",
778            }
779        )
780    }
781}