ssh_commander_core/tools/
tcpdump.rs1use crate::event_bus::{CoreEvent, event_sender};
13use crate::ssh::SshClient;
14use crate::tools::ToolsError;
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::Mutex;
18use std::sync::OnceLock;
19use std::sync::atomic::{AtomicU64, Ordering};
20use tokio_util::sync::CancellationToken;
21
22#[derive(Debug, Clone)]
24pub struct TcpdumpEvent {
25 pub capture_id: u64,
26 pub line: String,
27 pub is_stderr: bool,
30}
31
32type CaptureMap = Arc<Mutex<HashMap<u64, CancellationToken>>>;
33
34pub struct TcpdumpRegistry {
36 next_id: AtomicU64,
37 captures: CaptureMap,
38}
39
40impl Default for TcpdumpRegistry {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl TcpdumpRegistry {
47 pub fn new() -> Self {
48 Self {
49 next_id: AtomicU64::new(1),
50 captures: Arc::new(Mutex::new(HashMap::new())),
51 }
52 }
53
54 pub fn global() -> &'static Self {
57 static REGISTRY: OnceLock<TcpdumpRegistry> = OnceLock::new();
58 REGISTRY.get_or_init(TcpdumpRegistry::new)
59 }
60
61 pub async fn start(
69 &self,
70 client: &SshClient,
71 interface: &str,
72 filter: &str,
73 snaplen: Option<u32>,
74 ) -> Result<u64, ToolsError> {
75 validate_interface(interface)?;
76 validate_filter(filter)?;
77
78 let snap = snaplen.unwrap_or(96);
79 let cmd = if filter.trim().is_empty() {
80 format!("sudo -n tcpdump -lnn -s {snap} -i {interface}")
81 } else {
82 format!("sudo -n tcpdump -lnn -s {snap} -i {interface} '{filter}'")
83 };
84
85 let (mut rx, cancel) = client
86 .execute_command_streaming(&cmd)
87 .await
88 .map_err(|e| ToolsError::SshExec(e.to_string()))?;
89
90 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
91 if let Ok(mut map) = self.captures.lock() {
92 map.insert(id, cancel.clone());
93 }
94
95 let captures = self.captures.clone();
96 tokio::spawn(async move {
97 let tx = event_sender();
98 while let Some(line) = rx.recv().await {
99 let (is_stderr, payload) = if let Some(rest) = line.strip_prefix('!') {
100 (true, rest.to_string())
101 } else {
102 (false, line)
103 };
104 if let Some(ref tx) = tx {
105 let _ = tx.send(CoreEvent::TcpdumpLine {
106 capture_id: id,
107 line: payload,
108 is_stderr,
109 });
110 }
111 }
112 if let Ok(mut map) = captures.lock() {
115 map.remove(&id);
116 }
117 });
118
119 Ok(id)
120 }
121
122 pub fn stop(&self, id: u64) -> Result<(), ToolsError> {
123 let token = {
124 let mut map = self
125 .captures
126 .lock()
127 .map_err(|e| ToolsError::Parse(format!("lock poisoned: {e}")))?;
128 map.remove(&id)
129 };
130 match token {
131 Some(t) => {
132 t.cancel();
133 Ok(())
134 }
135 None => Err(ToolsError::CaptureNotFound(id)),
136 }
137 }
138}
139
140fn validate_interface(iface: &str) -> Result<(), ToolsError> {
141 if iface.is_empty()
142 || iface.len() > 32
143 || !iface
144 .chars()
145 .all(|c| c.is_ascii_alphanumeric() || matches!(c, '.' | '-' | '_' | ':'))
146 {
147 return Err(ToolsError::Parse(format!("invalid interface: {iface}")));
148 }
149 Ok(())
150}
151
152fn validate_filter(filter: &str) -> Result<(), ToolsError> {
153 if filter.contains('\'') {
157 return Err(ToolsError::Parse(
158 "filter may not contain single quotes".into(),
159 ));
160 }
161 if filter.len() > 4096 {
162 return Err(ToolsError::Parse("filter too long".into()));
163 }
164 Ok(())
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn rejects_bad_interface() {
173 assert!(validate_interface("eth0; rm -rf /").is_err());
174 assert!(validate_interface("").is_err());
175 assert!(validate_interface("eth0").is_ok());
176 assert!(validate_interface("any").is_ok());
177 assert!(validate_interface("en0:vlan100").is_ok());
178 }
179
180 #[test]
181 fn rejects_filter_with_quotes() {
182 assert!(validate_filter("port 80").is_ok());
183 assert!(validate_filter("port 80 and 'evil'").is_err());
184 assert!(validate_filter("").is_ok());
185 }
186}