1use anyhow::{Context, Result};
2use clap::{Parser, Subcommand};
3use futures::StreamExt;
4use orb8_proto::{
5 GetStatusRequest, OrbitAgentServiceClient, QueryFlowsRequest, StreamEventsRequest,
6};
7
8pub use orb8_proto::{AgentStatus, NetworkEvent, NetworkFlow};
9
10#[derive(Parser)]
11#[command(name = "orb8")]
12#[command(about = "eBPF-powered observability for Kubernetes", long_about = None)]
13#[command(version)]
14struct Cli {
15 #[arg(short, long, default_value = "localhost:9090", global = true)]
17 agent: String,
18
19 #[command(subcommand)]
20 command: Commands,
21}
22
23#[derive(Subcommand)]
24enum Commands {
25 Trace {
27 #[command(subcommand)]
28 kind: TraceKind,
29 },
30 Flows {
32 #[arg(short, long)]
34 namespace: Vec<String>,
35
36 #[arg(short, long)]
38 pod: Vec<String>,
39
40 #[arg(short, long, default_value = "20")]
42 limit: u32,
43 },
44 Status,
46}
47
48#[derive(Subcommand)]
49enum TraceKind {
50 Network {
52 #[arg(short, long)]
54 namespace: Vec<String>,
55
56 #[arg(short, long)]
58 duration: Option<String>,
59 },
60}
61
62pub async fn run() -> Result<()> {
63 let cli = Cli::parse();
64
65 match cli.command {
66 Commands::Trace { kind } => match kind {
67 TraceKind::Network {
68 namespace,
69 duration,
70 } => {
71 trace_network(&cli.agent, namespace, duration).await?;
72 }
73 },
74 Commands::Flows {
75 namespace,
76 pod,
77 limit,
78 } => {
79 query_flows(&cli.agent, namespace, pod, limit).await?;
80 }
81 Commands::Status => {
82 get_status(&cli.agent).await?;
83 }
84 }
85
86 Ok(())
87}
88
89async fn trace_network(
90 agent: &str,
91 namespaces: Vec<String>,
92 duration: Option<String>,
93) -> Result<()> {
94 let endpoint = format!("http://{}", agent);
95 let mut client = OrbitAgentServiceClient::connect(endpoint)
96 .await
97 .context("Failed to connect to agent")?;
98
99 let request = StreamEventsRequest {
100 namespaces: namespaces.clone(),
101 };
102
103 println!(
104 "Streaming network events from {}{}...",
105 agent,
106 if namespaces.is_empty() {
107 String::new()
108 } else {
109 format!(" (namespaces: {})", namespaces.join(", "))
110 }
111 );
112 println!(
113 "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>7}",
114 "NAMESPACE/POD", "PROTOCOL", "SOURCE", "DESTINATION", "DIR", "BYTES", "TIME"
115 );
116 println!("{}", "-".repeat(110));
117
118 let duration_ms = duration.map(|d| parse_duration(&d)).transpose()?;
119 let start = std::time::Instant::now();
120
121 let mut stream = client.stream_events(request).await?.into_inner();
122
123 while let Some(result) = stream.next().await {
124 if let Some(max_ms) = duration_ms {
125 if start.elapsed().as_millis() as u64 >= max_ms {
126 println!("\nDuration reached, stopping trace.");
127 break;
128 }
129 }
130
131 match result {
132 Ok(event) => {
133 let ns_pod = format!("{}/{}", event.namespace, truncate(&event.pod_name, 12));
134 let src = format!("{}:{}", event.src_ip, event.src_port);
135 let dst = format!("{}:{}", event.dst_ip, event.dst_port);
136 let time = chrono::Local::now().format("%H:%M:%S%.3f");
137
138 println!(
139 "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>7}",
140 truncate(&ns_pod, 20),
141 event.protocol,
142 src,
143 dst,
144 event.direction,
145 format_bytes(event.bytes as u64),
146 time
147 );
148 }
149 Err(e) => {
150 eprintln!("Stream error: {}", e);
151 break;
152 }
153 }
154 }
155
156 Ok(())
157}
158
159async fn query_flows(
160 agent: &str,
161 namespaces: Vec<String>,
162 pod_names: Vec<String>,
163 limit: u32,
164) -> Result<()> {
165 let endpoint = format!("http://{}", agent);
166 let mut client = OrbitAgentServiceClient::connect(endpoint)
167 .await
168 .context("Failed to connect to agent")?;
169
170 let request = QueryFlowsRequest {
171 namespaces,
172 pod_names,
173 limit,
174 };
175
176 let response = client.query_flows(request).await?.into_inner();
177
178 if response.flows.is_empty() {
179 println!("No flows found.");
180 return Ok(());
181 }
182
183 println!(
184 "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>8}",
185 "NAMESPACE/POD", "PROTOCOL", "SOURCE", "DESTINATION", "DIR", "BYTES", "PACKETS"
186 );
187 println!("{}", "-".repeat(110));
188
189 for flow in response.flows {
190 let ns_pod = format!("{}/{}", flow.namespace, truncate(&flow.pod_name, 12));
191 let src = format!("{}:{}", flow.src_ip, flow.src_port);
192 let dst = format!("{}:{}", flow.dst_ip, flow.dst_port);
193
194 println!(
195 "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>8}",
196 truncate(&ns_pod, 20),
197 flow.protocol,
198 src,
199 dst,
200 flow.direction,
201 format_bytes(flow.bytes),
202 flow.packets
203 );
204 }
205
206 Ok(())
207}
208
209async fn get_status(agent: &str) -> Result<()> {
210 let endpoint = format!("http://{}", agent);
211 let mut client = OrbitAgentServiceClient::connect(endpoint)
212 .await
213 .context("Failed to connect to agent")?;
214
215 let response = client.get_status(GetStatusRequest {}).await?.into_inner();
216
217 println!("Agent Status");
218 println!("{}", "-".repeat(40));
219 println!("Node: {}", response.node_name);
220 println!("Version: {}", response.version);
221 println!(
222 "Health: {}",
223 if response.healthy { "OK" } else { "UNHEALTHY" }
224 );
225 println!("Health Message: {}", response.health_message);
226 println!("Uptime: {}s", response.uptime_seconds);
227 println!("Events Processed: {}", response.events_processed);
228 println!("Events Dropped: {}", response.events_dropped);
229 println!("Pods Tracked: {}", response.pods_tracked);
230 println!("Active Flows: {}", response.active_flows);
231
232 Ok(())
233}
234
235fn truncate(s: &str, max_len: usize) -> String {
236 if s.len() <= max_len {
237 s.to_string()
238 } else {
239 format!("{}...", &s[..max_len - 3])
240 }
241}
242
243fn format_bytes(bytes: u64) -> String {
244 if bytes < 1024 {
245 format!("{}B", bytes)
246 } else if bytes < 1024 * 1024 {
247 format!("{:.1}KB", bytes as f64 / 1024.0)
248 } else if bytes < 1024 * 1024 * 1024 {
249 format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
250 } else {
251 format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
252 }
253}
254
255fn parse_duration(s: &str) -> Result<u64> {
256 let s = s.trim();
257 let (num, unit) = if s.ends_with("ms") {
258 (s.trim_end_matches("ms"), 1u64)
259 } else if s.ends_with('s') {
260 (s.trim_end_matches('s'), 1000u64)
261 } else if s.ends_with('m') {
262 (s.trim_end_matches('m'), 60_000u64)
263 } else if s.ends_with('h') {
264 (s.trim_end_matches('h'), 3_600_000u64)
265 } else {
266 anyhow::bail!("Invalid duration format. Use: 30s, 5m, 1h, 500ms");
267 };
268
269 let value: u64 = num.parse().context("Invalid duration number")?;
270 Ok(value * unit)
271}