mabi_cli/commands/
serve.rs1use async_trait::async_trait;
2use mabi_runtime::{
3 ProtocolLaunchSpec, RuntimeExtensions, RuntimeSession, RuntimeSessionSpec, ServiceSnapshot,
4};
5use tokio::time::Duration;
6
7use crate::context::CliContext;
8use crate::error::{CliError, CliResult};
9use crate::output::OutputFormat;
10use crate::runner::{Command, CommandOutput};
11use crate::runtime_registry::workspace_protocol_registry;
12
13#[derive(Clone)]
15pub struct ServeRuntimeCommand {
16 launch: ProtocolLaunchSpec,
17 readiness_timeout: Duration,
18 extensions: RuntimeExtensions,
19}
20
21impl ServeRuntimeCommand {
22 pub fn new(
23 launch: ProtocolLaunchSpec,
24 readiness_timeout: Duration,
25 extensions: RuntimeExtensions,
26 ) -> Self {
27 Self {
28 launch,
29 readiness_timeout,
30 extensions,
31 }
32 }
33
34 fn render_started(&self, ctx: &CliContext, snapshot: &ServiceSnapshot) -> CliResult<()> {
35 let output = ctx.output();
36 if matches!(
37 output.format(),
38 OutputFormat::Json | OutputFormat::Yaml | OutputFormat::Compact
39 ) {
40 output.write(snapshot)?;
41 return Ok(());
42 }
43
44 output.header(format!("{} Service", snapshot.name));
45 if let Some(protocol) = snapshot.protocol {
46 output.kv("Protocol", format!("{:?}", protocol));
47 }
48 output.kv("State", format!("{:?}", snapshot.status.state));
49 for (key, value) in &snapshot.metadata {
50 if key.starts_with('_') {
51 continue;
52 }
53 match value {
54 serde_json::Value::String(value) => output.kv(key, value),
55 serde_json::Value::Number(value) => output.kv(key, value),
56 serde_json::Value::Bool(value) => output.kv(key, value),
57 _ => output.kv(key, value),
58 }
59 }
60 Ok(())
61 }
62}
63
64#[async_trait]
65impl Command for ServeRuntimeCommand {
66 fn name(&self) -> &str {
67 "serve"
68 }
69
70 fn description(&self) -> &str {
71 "Serve a protocol simulator through the shared runtime"
72 }
73
74 fn supports_shutdown(&self) -> bool {
75 true
76 }
77
78 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
79 let registry = workspace_protocol_registry();
80 let session = RuntimeSession::new(
81 RuntimeSessionSpec {
82 services: vec![self.launch.clone()],
83 readiness_timeout: Some(self.readiness_timeout.as_millis() as u64),
84 },
85 ®istry,
86 self.extensions.clone(),
87 )
88 .await?;
89
90 session.start(self.readiness_timeout).await?;
91 let snapshot = session
92 .snapshots()
93 .await?
94 .into_iter()
95 .next()
96 .ok_or_else(|| CliError::ExecutionFailed {
97 message: "runtime session did not return a service snapshot".into(),
98 })?;
99
100 self.render_started(ctx, &snapshot)?;
101
102 if !ctx.is_quiet() {
103 ctx.output().info("Press Ctrl+C to stop");
104 }
105
106 let shutdown_signal = ctx.shutdown_signal();
107 tokio::select! {
108 _ = shutdown_signal.notified() => {
109 session.stop().await?;
110 if !ctx.is_quiet() {
111 ctx.output().success(format!("{} stopped", snapshot.name));
112 }
113 Ok(CommandOutput::quiet_success())
114 }
115 result = async {
116 for handle in session.handles() {
117 handle.wait().await?;
118 }
119 Ok::<(), mabi_runtime::RuntimeError>(())
120 } => {
121 result?;
122 let final_snapshot = session
123 .snapshots()
124 .await?
125 .into_iter()
126 .next()
127 .unwrap_or(snapshot);
128 if final_snapshot.status.state == mabi_runtime::ServiceState::Stopped {
129 if !ctx.is_quiet() {
130 ctx.output().info(format!("{} exited cleanly", final_snapshot.name));
131 }
132 Ok(CommandOutput::quiet_success())
133 } else {
134 Err(CliError::ExecutionFailed {
135 message: format!(
136 "service terminated unexpectedly: {:?}",
137 final_snapshot.status.state
138 ),
139 })
140 }
141 }
142 }
143 }
144}