1use crate::cli::TraceArgs;
7use crate::errors::CliError;
8use crate::output::terminal;
9use crate::output::{CommandResult, HumanRenderable, OutputContext};
10use serde::Serialize;
11
12#[derive(Debug, Serialize)]
14pub struct TraceResult {
15 pub elements_traced: u64,
17 pub avg_latency_us: f64,
19 pub p50_latency_us: f64,
21 pub p99_latency_us: f64,
23 pub total_copies: u64,
25 pub buffer_reuse_pct: f64,
27 pub backpressure_events: u64,
29 pub traces: Vec<ElementTrace>,
31 pub flow_name: String,
33}
34
35#[derive(Debug, Serialize)]
37pub struct ElementTrace {
38 pub element_id: u64,
40 pub spans: Vec<ComponentSpan>,
42 pub total_latency_us: f64,
44}
45
46#[derive(Debug, Serialize)]
48pub struct ComponentSpan {
49 pub component: String,
51 pub operation: String,
53 pub duration_us: f64,
55 pub buffer_info: Option<String>,
57}
58
59impl HumanRenderable for TraceResult {
60 fn render_human(&self, ctx: &OutputContext) {
61 for trace in &self.traces {
62 eprintln!();
63 let elem_label = format!("elem-{}", trace.element_id);
64 if ctx.color_enabled {
65 eprint!(" {} ", console::style(&elem_label).bold());
66 } else {
67 eprint!(" {elem_label} ");
68 }
69
70 for (i, span) in trace.spans.iter().enumerate() {
71 let connector = if i == 0 {
72 "┬─"
73 } else if i == trace.spans.len() - 1 {
74 "└─"
75 } else {
76 "├─"
77 };
78
79 let buf_str = span
80 .buffer_info
81 .as_deref()
82 .map(|b| format!(" {b}"))
83 .unwrap_or_default();
84
85 if i > 0 {
86 eprint!(" ");
87 }
88 eprintln!(
89 "{connector} {:<14} {:<8} {:.1}µs{buf_str}",
90 span.component, span.operation, span.duration_us,
91 );
92 }
93 eprintln!(" total: {:.1}µs", trace.total_latency_us);
94 }
95
96 terminal::print_header(ctx, "Trace Summary");
97 terminal::print_kv(ctx, "Elements traced", &format!("{}", self.elements_traced));
98 terminal::print_kv(
99 ctx,
100 "Avg latency",
101 &format!(
102 "{:.1}µs (p50: {:.1}µs, p99: {:.1}µs)",
103 self.avg_latency_us, self.p50_latency_us, self.p99_latency_us
104 ),
105 );
106 terminal::print_kv(ctx, "Copies", &format!("{}", self.total_copies));
107 terminal::print_kv(
108 ctx,
109 "Buffer reuse",
110 &format!("{:.0}%", self.buffer_reuse_pct),
111 );
112 terminal::print_kv(
113 ctx,
114 "Backpressure",
115 &format!("{} events", self.backpressure_events),
116 );
117 }
118}
119
120pub async fn execute(
130 args: &TraceArgs,
131 ctx: &OutputContext,
132) -> Result<CommandResult<TraceResult>, CliError> {
133 let manifest_path = &args.manifest;
134
135 if !manifest_path.exists() {
136 return Err(CliError::Config {
137 detail: format!("Manifest not found: {}", manifest_path.display()),
138 file: Some(manifest_path.display().to_string()),
139 suggestion: "Run this command from a Torvyn project directory.".into(),
140 });
141 }
142
143 let manifest_content = std::fs::read_to_string(manifest_path).map_err(|e| CliError::Io {
144 detail: e.to_string(),
145 path: Some(manifest_path.display().to_string()),
146 })?;
147
148 let manifest = torvyn_config::ComponentManifest::from_toml_str(
149 &manifest_content,
150 manifest_path.to_str().unwrap_or("Torvyn.toml"),
151 )
152 .map_err(|errors| CliError::Config {
153 detail: format!("Manifest has {} error(s)", errors.len()),
154 file: Some(manifest_path.display().to_string()),
155 suggestion: "Run `torvyn check` first.".into(),
156 })?;
157
158 let flow_name = args
159 .flow
160 .clone()
161 .or_else(|| manifest.flow.keys().next().cloned())
162 .ok_or_else(|| CliError::Config {
163 detail: "No flow defined in manifest".into(),
164 file: Some(manifest_path.display().to_string()),
165 suggestion: "Add a [flow.*] section or use --flow <name>.".into(),
166 })?;
167
168 let limit_label = args
169 .limit
170 .map(|l| format!("limit: {l} elements"))
171 .unwrap_or_else(|| "no limit".into());
172
173 if !ctx.quiet && ctx.format == crate::cli::OutputFormat::Human {
174 eprintln!("▶ Tracing flow \"{flow_name}\" ({limit_label})");
175 }
176
177 let obs_config = torvyn_config::ObservabilityConfig {
178 tracing_enabled: true,
179 tracing_sample_rate: 1.0,
180 ..Default::default()
181 };
182
183 let mut host = torvyn_host::HostBuilder::new()
184 .with_config_file(manifest_path)
185 .with_observability_config(obs_config)
186 .build()
187 .await
188 .map_err(|e| CliError::Runtime {
189 detail: format!("Failed to initialize host: {e}"),
190 context: None,
191 })?;
192
193 let _flow_id = host
194 .start_flow(&flow_name)
195 .await
196 .map_err(|e| CliError::Runtime {
197 detail: format!("Failed to start flow: {e}"),
198 context: Some(flow_name.clone()),
199 })?;
200
201 host.run().await.map_err(|e| CliError::Runtime {
203 detail: format!("Pipeline execution failed: {e}"),
204 context: Some(flow_name.clone()),
205 })?;
206
207 let result = TraceResult {
209 elements_traced: 0,
210 avg_latency_us: 0.0,
211 p50_latency_us: 0.0,
212 p99_latency_us: 0.0,
213 total_copies: 0,
214 buffer_reuse_pct: 0.0,
215 backpressure_events: 0,
216 traces: vec![],
217 flow_name,
218 };
219
220 Ok(CommandResult {
221 success: true,
222 command: "trace".into(),
223 data: result,
224 warnings: vec![],
225 })
226}