Skip to main content

orb8_cli/
lib.rs

1use anyhow::{Context, Result};
2use clap::{Parser, Subcommand};
3use futures::StreamExt;
4use orb8_proto::{
5    GetStatusRequest, OrbitAgentServiceClient, QueryFlowsRequest, StreamEventsRequest,
6};
7
8pub use orb8_proto::{AgentStatus, NetworkEvent, NetworkFlow};
9
10#[derive(Parser)]
11#[command(name = "orb8")]
12#[command(about = "eBPF-powered observability for Kubernetes", long_about = None)]
13#[command(version)]
14struct Cli {
15    /// Agent address (host:port)
16    #[arg(short, long, default_value = "localhost:9090", global = true)]
17    agent: String,
18
19    #[command(subcommand)]
20    command: Commands,
21}
22
23#[derive(Subcommand)]
24enum Commands {
25    /// Stream live network events
26    Trace {
27        #[command(subcommand)]
28        kind: TraceKind,
29    },
30    /// Query aggregated network flows
31    Flows {
32        /// Filter by namespace(s)
33        #[arg(short, long)]
34        namespace: Vec<String>,
35
36        /// Filter by pod name(s)
37        #[arg(short, long)]
38        pod: Vec<String>,
39
40        /// Maximum number of flows to return
41        #[arg(short, long, default_value = "20")]
42        limit: u32,
43    },
44    /// Get agent status
45    Status,
46}
47
48#[derive(Subcommand)]
49enum TraceKind {
50    /// Trace network events
51    Network {
52        /// Filter by namespace(s)
53        #[arg(short, long)]
54        namespace: Vec<String>,
55
56        /// Duration to trace (e.g., "30s", "5m"). Runs indefinitely if not specified.
57        #[arg(short, long)]
58        duration: Option<String>,
59    },
60}
61
62pub async fn run() -> Result<()> {
63    let cli = Cli::parse();
64
65    match cli.command {
66        Commands::Trace { kind } => match kind {
67            TraceKind::Network {
68                namespace,
69                duration,
70            } => {
71                trace_network(&cli.agent, namespace, duration).await?;
72            }
73        },
74        Commands::Flows {
75            namespace,
76            pod,
77            limit,
78        } => {
79            query_flows(&cli.agent, namespace, pod, limit).await?;
80        }
81        Commands::Status => {
82            get_status(&cli.agent).await?;
83        }
84    }
85
86    Ok(())
87}
88
89async fn trace_network(
90    agent: &str,
91    namespaces: Vec<String>,
92    duration: Option<String>,
93) -> Result<()> {
94    let endpoint = format!("http://{}", agent);
95    let mut client = OrbitAgentServiceClient::connect(endpoint)
96        .await
97        .context("Failed to connect to agent")?;
98
99    let request = StreamEventsRequest {
100        namespaces: namespaces.clone(),
101    };
102
103    println!(
104        "Streaming network events from {}{}...",
105        agent,
106        if namespaces.is_empty() {
107            String::new()
108        } else {
109            format!(" (namespaces: {})", namespaces.join(", "))
110        }
111    );
112    println!(
113        "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>7}",
114        "NAMESPACE/POD", "PROTOCOL", "SOURCE", "DESTINATION", "DIR", "BYTES", "TIME"
115    );
116    println!("{}", "-".repeat(110));
117
118    let duration_ms = duration.map(|d| parse_duration(&d)).transpose()?;
119    let start = std::time::Instant::now();
120
121    let mut stream = client.stream_events(request).await?.into_inner();
122
123    while let Some(result) = stream.next().await {
124        if let Some(max_ms) = duration_ms {
125            if start.elapsed().as_millis() as u64 >= max_ms {
126                println!("\nDuration reached, stopping trace.");
127                break;
128            }
129        }
130
131        match result {
132            Ok(event) => {
133                let ns_pod = format!("{}/{}", event.namespace, truncate(&event.pod_name, 12));
134                let src = format!("{}:{}", event.src_ip, event.src_port);
135                let dst = format!("{}:{}", event.dst_ip, event.dst_port);
136                let time = chrono::Local::now().format("%H:%M:%S%.3f");
137
138                println!(
139                    "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>7}",
140                    truncate(&ns_pod, 20),
141                    event.protocol,
142                    src,
143                    dst,
144                    event.direction,
145                    format_bytes(event.bytes as u64),
146                    time
147                );
148            }
149            Err(e) => {
150                eprintln!("Stream error: {}", e);
151                break;
152            }
153        }
154    }
155
156    Ok(())
157}
158
159async fn query_flows(
160    agent: &str,
161    namespaces: Vec<String>,
162    pod_names: Vec<String>,
163    limit: u32,
164) -> Result<()> {
165    let endpoint = format!("http://{}", agent);
166    let mut client = OrbitAgentServiceClient::connect(endpoint)
167        .await
168        .context("Failed to connect to agent")?;
169
170    let request = QueryFlowsRequest {
171        namespaces,
172        pod_names,
173        limit,
174    };
175
176    let response = client.query_flows(request).await?.into_inner();
177
178    if response.flows.is_empty() {
179        println!("No flows found.");
180        return Ok(());
181    }
182
183    println!(
184        "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>8}",
185        "NAMESPACE/POD", "PROTOCOL", "SOURCE", "DESTINATION", "DIR", "BYTES", "PACKETS"
186    );
187    println!("{}", "-".repeat(110));
188
189    for flow in response.flows {
190        let ns_pod = format!("{}/{}", flow.namespace, truncate(&flow.pod_name, 12));
191        let src = format!("{}:{}", flow.src_ip, flow.src_port);
192        let dst = format!("{}:{}", flow.dst_ip, flow.dst_port);
193
194        println!(
195            "{:<20} {:<15} {:>21} {:>21} {:>8} {:>9} {:>8}",
196            truncate(&ns_pod, 20),
197            flow.protocol,
198            src,
199            dst,
200            flow.direction,
201            format_bytes(flow.bytes),
202            flow.packets
203        );
204    }
205
206    Ok(())
207}
208
209async fn get_status(agent: &str) -> Result<()> {
210    let endpoint = format!("http://{}", agent);
211    let mut client = OrbitAgentServiceClient::connect(endpoint)
212        .await
213        .context("Failed to connect to agent")?;
214
215    let response = client.get_status(GetStatusRequest {}).await?.into_inner();
216
217    println!("Agent Status");
218    println!("{}", "-".repeat(40));
219    println!("Node:             {}", response.node_name);
220    println!("Version:          {}", response.version);
221    println!(
222        "Health:           {}",
223        if response.healthy { "OK" } else { "UNHEALTHY" }
224    );
225    println!("Health Message:   {}", response.health_message);
226    println!("Uptime:           {}s", response.uptime_seconds);
227    println!("Events Processed: {}", response.events_processed);
228    println!("Events Dropped:   {}", response.events_dropped);
229    println!("Pods Tracked:     {}", response.pods_tracked);
230    println!("Active Flows:     {}", response.active_flows);
231
232    Ok(())
233}
234
235fn truncate(s: &str, max_len: usize) -> String {
236    if s.len() <= max_len {
237        s.to_string()
238    } else {
239        format!("{}...", &s[..max_len - 3])
240    }
241}
242
243fn format_bytes(bytes: u64) -> String {
244    if bytes < 1024 {
245        format!("{}B", bytes)
246    } else if bytes < 1024 * 1024 {
247        format!("{:.1}KB", bytes as f64 / 1024.0)
248    } else if bytes < 1024 * 1024 * 1024 {
249        format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
250    } else {
251        format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
252    }
253}
254
255fn parse_duration(s: &str) -> Result<u64> {
256    let s = s.trim();
257    let (num, unit) = if s.ends_with("ms") {
258        (s.trim_end_matches("ms"), 1u64)
259    } else if s.ends_with('s') {
260        (s.trim_end_matches('s'), 1000u64)
261    } else if s.ends_with('m') {
262        (s.trim_end_matches('m'), 60_000u64)
263    } else if s.ends_with('h') {
264        (s.trim_end_matches('h'), 3_600_000u64)
265    } else {
266        anyhow::bail!("Invalid duration format. Use: 30s, 5m, 1h, 500ms");
267    };
268
269    let value: u64 = num.parse().context("Invalid duration number")?;
270    Ok(value * unit)
271}