Skip to main content

mabi_cli/commands/
serve.rs

1use async_trait::async_trait;
2use mabi_runtime::{
3    ProtocolLaunchSpec, RuntimeExtensions, RuntimeSession, RuntimeSessionSpec, ServiceSnapshot,
4};
5use tokio::time::Duration;
6
7use crate::context::CliContext;
8use crate::error::{CliError, CliResult};
9use crate::output::OutputFormat;
10use crate::runner::{Command, CommandOutput};
11use crate::runtime_registry::workspace_protocol_registry;
12
13/// Runtime-backed serve command for protocol services.
14#[derive(Clone)]
15pub struct ServeRuntimeCommand {
16    launch: ProtocolLaunchSpec,
17    readiness_timeout: Duration,
18    extensions: RuntimeExtensions,
19}
20
21impl ServeRuntimeCommand {
22    pub fn new(
23        launch: ProtocolLaunchSpec,
24        readiness_timeout: Duration,
25        extensions: RuntimeExtensions,
26    ) -> Self {
27        Self {
28            launch,
29            readiness_timeout,
30            extensions,
31        }
32    }
33
34    fn render_started(&self, ctx: &CliContext, snapshot: &ServiceSnapshot) -> CliResult<()> {
35        let output = ctx.output();
36        if matches!(
37            output.format(),
38            OutputFormat::Json | OutputFormat::Yaml | OutputFormat::Compact
39        ) {
40            output.write(snapshot)?;
41            return Ok(());
42        }
43
44        output.header(format!("{} Service", snapshot.name));
45        if let Some(protocol) = snapshot.protocol {
46            output.kv("Protocol", format!("{:?}", protocol));
47        }
48        output.kv("State", format!("{:?}", snapshot.status.state));
49        for (key, value) in &snapshot.metadata {
50            match value {
51                serde_json::Value::String(value) => output.kv(key, value),
52                serde_json::Value::Number(value) => output.kv(key, value),
53                serde_json::Value::Bool(value) => output.kv(key, value),
54                _ => output.kv(key, value),
55            }
56        }
57        Ok(())
58    }
59}
60
61#[async_trait]
62impl Command for ServeRuntimeCommand {
63    fn name(&self) -> &str {
64        "serve"
65    }
66
67    fn description(&self) -> &str {
68        "Serve a protocol simulator through the shared runtime"
69    }
70
71    fn supports_shutdown(&self) -> bool {
72        true
73    }
74
75    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
76        let registry = workspace_protocol_registry();
77        let session = RuntimeSession::new(
78            RuntimeSessionSpec {
79                services: vec![self.launch.clone()],
80                readiness_timeout: Some(self.readiness_timeout.as_millis() as u64),
81            },
82            &registry,
83            self.extensions.clone(),
84        )
85        .await?;
86
87        session.start(self.readiness_timeout).await?;
88        let snapshot = session
89            .snapshots()
90            .await?
91            .into_iter()
92            .next()
93            .ok_or_else(|| CliError::ExecutionFailed {
94                message: "runtime session did not return a service snapshot".into(),
95            })?;
96
97        self.render_started(ctx, &snapshot)?;
98
99        if !ctx.is_quiet() {
100            ctx.output().info("Press Ctrl+C to stop");
101        }
102
103        let shutdown_signal = ctx.shutdown_signal();
104        tokio::select! {
105            _ = shutdown_signal.notified() => {
106                session.stop().await?;
107                if !ctx.is_quiet() {
108                    ctx.output().success(format!("{} stopped", snapshot.name));
109                }
110                Ok(CommandOutput::quiet_success())
111            }
112            result = async {
113                for handle in session.handles() {
114                    handle.wait().await?;
115                }
116                Ok::<(), mabi_runtime::RuntimeError>(())
117            } => {
118                result?;
119                let final_snapshot = session
120                    .snapshots()
121                    .await?
122                    .into_iter()
123                    .next()
124                    .unwrap_or(snapshot);
125                if final_snapshot.status.state == mabi_runtime::ServiceState::Stopped {
126                    if !ctx.is_quiet() {
127                        ctx.output().info(format!("{} exited cleanly", final_snapshot.name));
128                    }
129                    Ok(CommandOutput::quiet_success())
130                } else {
131                    Err(CliError::ExecutionFailed {
132                        message: format!(
133                            "service terminated unexpectedly: {:?}",
134                            final_snapshot.status.state
135                        ),
136                    })
137                }
138            }
139        }
140    }
141}