Skip to main content

mabi_cli/commands/
serve.rs

1use async_trait::async_trait;
2use mabi_runtime::{
3    ProtocolLaunchSpec, RuntimeExtensions, RuntimeSession, RuntimeSessionSpec, ServiceSnapshot,
4};
5use tokio::time::Duration;
6
7use crate::context::CliContext;
8use crate::error::{CliError, CliResult};
9use crate::output::OutputFormat;
10use crate::runner::{Command, CommandOutput};
11use crate::runtime_registry::workspace_protocol_registry;
12
13/// Runtime-backed serve command for protocol services.
14#[derive(Clone)]
15pub struct ServeRuntimeCommand {
16    launch: ProtocolLaunchSpec,
17    readiness_timeout: Duration,
18    extensions: RuntimeExtensions,
19}
20
21impl ServeRuntimeCommand {
22    pub fn new(
23        launch: ProtocolLaunchSpec,
24        readiness_timeout: Duration,
25        extensions: RuntimeExtensions,
26    ) -> Self {
27        Self {
28            launch,
29            readiness_timeout,
30            extensions,
31        }
32    }
33
34    fn render_started(&self, ctx: &CliContext, snapshot: &ServiceSnapshot) -> CliResult<()> {
35        let output = ctx.output();
36        if matches!(
37            output.format(),
38            OutputFormat::Json | OutputFormat::Yaml | OutputFormat::Compact
39        ) {
40            output.write(snapshot)?;
41            return Ok(());
42        }
43
44        output.header(format!("{} Service", snapshot.name));
45        if let Some(protocol) = snapshot.protocol {
46            output.kv("Protocol", format!("{:?}", protocol));
47        }
48        output.kv("State", format!("{:?}", snapshot.status.state));
49        for (key, value) in &snapshot.metadata {
50            if key.starts_with('_') {
51                continue;
52            }
53            match value {
54                serde_json::Value::String(value) => output.kv(key, value),
55                serde_json::Value::Number(value) => output.kv(key, value),
56                serde_json::Value::Bool(value) => output.kv(key, value),
57                _ => output.kv(key, value),
58            }
59        }
60        Ok(())
61    }
62}
63
64#[async_trait]
65impl Command for ServeRuntimeCommand {
66    fn name(&self) -> &str {
67        "serve"
68    }
69
70    fn description(&self) -> &str {
71        "Serve a protocol simulator through the shared runtime"
72    }
73
74    fn supports_shutdown(&self) -> bool {
75        true
76    }
77
78    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
79        let registry = workspace_protocol_registry();
80        let session = RuntimeSession::new(
81            RuntimeSessionSpec {
82                services: vec![self.launch.clone()],
83                readiness_timeout: Some(self.readiness_timeout.as_millis() as u64),
84            },
85            &registry,
86            self.extensions.clone(),
87        )
88        .await?;
89
90        session.start(self.readiness_timeout).await?;
91        let snapshot = session
92            .snapshots()
93            .await?
94            .into_iter()
95            .next()
96            .ok_or_else(|| CliError::ExecutionFailed {
97                message: "runtime session did not return a service snapshot".into(),
98            })?;
99
100        self.render_started(ctx, &snapshot)?;
101
102        if !ctx.is_quiet() {
103            ctx.output().info("Press Ctrl+C to stop");
104        }
105
106        let shutdown_signal = ctx.shutdown_signal();
107        tokio::select! {
108            _ = shutdown_signal.notified() => {
109                session.stop().await?;
110                if !ctx.is_quiet() {
111                    ctx.output().success(format!("{} stopped", snapshot.name));
112                }
113                Ok(CommandOutput::quiet_success())
114            }
115            result = async {
116                for handle in session.handles() {
117                    handle.wait().await?;
118                }
119                Ok::<(), mabi_runtime::RuntimeError>(())
120            } => {
121                result?;
122                let final_snapshot = session
123                    .snapshots()
124                    .await?
125                    .into_iter()
126                    .next()
127                    .unwrap_or(snapshot);
128                if final_snapshot.status.state == mabi_runtime::ServiceState::Stopped {
129                    if !ctx.is_quiet() {
130                        ctx.output().info(format!("{} exited cleanly", final_snapshot.name));
131                    }
132                    Ok(CommandOutput::quiet_success())
133                } else {
134                    Err(CliError::ExecutionFailed {
135                        message: format!(
136                            "service terminated unexpectedly: {:?}",
137                            final_snapshot.status.state
138                        ),
139                    })
140                }
141            }
142        }
143    }
144}