hdc_rs/client.rs
1//! HDC client implementation
2
3use std::time::Duration;
4use tokio::net::TcpStream;
5use tokio::time::timeout;
6use tracing::{debug, info, warn};
7
8use crate::error::{HdcError, Result};
9use crate::protocol::{ChannelHandShake, HdcCommand, PacketCodec};
10
11/// Default connection timeout
12const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
13
14/// HDC client for communicating with HDC server
15pub struct HdcClient {
16 /// TCP stream to HDC server
17 stream: Option<TcpStream>,
18 /// Server address
19 address: String,
20 /// Packet codec for encoding/decoding
21 codec: PacketCodec,
22 /// Channel ID assigned by server
23 channel_id: u32,
24 /// Whether handshake is complete
25 handshake_ok: bool,
26 /// Current connect key (device identifier)
27 connect_key: Option<String>,
28}
29
30impl HdcClient {
31 /// Create a new HDC client (not connected)
32 pub fn new(address: impl Into<String>) -> Self {
33 Self {
34 stream: None,
35 address: address.into(),
36 codec: PacketCodec::new(),
37 channel_id: 0,
38 handshake_ok: false,
39 connect_key: None,
40 }
41 }
42
43 /// Connect to HDC server
44 pub async fn connect(address: impl Into<String>) -> Result<Self> {
45 let mut client = Self::new(address);
46 client.connect_internal().await?;
47 Ok(client)
48 }
49
50 /// Internal connection method
51 async fn connect_internal(&mut self) -> Result<()> {
52 info!("Connecting to HDC server at {}", self.address);
53
54 let stream = timeout(DEFAULT_TIMEOUT, TcpStream::connect(&self.address))
55 .await
56 .map_err(|_| HdcError::Timeout)?
57 .map_err(HdcError::Io)?;
58
59 info!("Connected to HDC server");
60 self.stream = Some(stream);
61
62 // Perform channel handshake
63 self.perform_handshake(None).await?;
64
65 Ok(())
66 }
67
68 /// Perform channel handshake with server
69 async fn perform_handshake(&mut self, connect_key: Option<&str>) -> Result<()> {
70 let stream = self.stream.as_mut().ok_or(HdcError::NotConnected)?;
71
72 info!("Starting channel handshake");
73
74 // Step 1: Read handshake from server
75 let handshake_data = self.codec.read_packet(stream).await?;
76 let received_size = handshake_data.len();
77 debug!("Received handshake data: {} bytes", received_size);
78
79 let mut handshake = ChannelHandShake::from_bytes(&handshake_data)?;
80
81 // Step 2: Verify banner
82 handshake.verify_banner()?;
83 info!("Banner verified: {:?}", &handshake.banner[..8]);
84
85 // Step 3: Extract channel ID
86 self.channel_id = handshake.get_channel_id();
87 info!("Assigned channel ID: {}", self.channel_id);
88
89 // Step 4: Check features
90 let is_stable = handshake.is_stable_buf();
91 debug!("Server stable buffer mode: {}", is_stable);
92
93 // Step 5: Set connect key and send response
94 if let Some(key) = connect_key {
95 handshake.set_connect_key(key);
96 self.connect_key = Some(key.to_string());
97 info!("Using connect key: {}", key);
98 } else {
99 // Empty connect key for initial connection
100 handshake.set_connect_key("");
101 }
102
103 // Send handshake response with same format as received
104 // If server sent 44 bytes (without version), respond with 44 bytes
105 // If server sent 108 bytes (with version), respond with 108 bytes
106 let response = if received_size >= ChannelHandShake::SIZE {
107 debug!("Sending full handshake response (108 bytes)");
108 handshake.to_bytes()
109 } else {
110 debug!("Sending handshake response without version (44 bytes)");
111 handshake.to_bytes_without_version()
112 };
113
114 self.codec.write_packet(stream, &response).await?;
115
116 self.handshake_ok = true;
117 info!("Channel handshake completed successfully");
118
119 Ok(())
120 }
121
122 /// Get the channel ID
123 pub fn channel_id(&self) -> u32 {
124 self.channel_id
125 }
126
127 /// Check if handshake is complete
128 pub fn is_connected(&self) -> bool {
129 self.handshake_ok && self.stream.is_some()
130 }
131
132 /// Send raw command string to server
133 ///
134 /// This is used for simple commands like "list targets", "shell ls", etc.
135 pub async fn send_command(&mut self, command: &str) -> Result<()> {
136 if !self.is_connected() {
137 return Err(HdcError::NotConnected);
138 }
139 if let Some(ref mut tcp_stream) = self.stream {
140 debug!("Sending command: {}", command);
141
142 // For simple commands, just send the command string
143 let cmd_bytes = command.as_bytes();
144 self.codec.write_packet(tcp_stream, cmd_bytes).await?;
145
146 return Ok(());
147 }
148 Err(HdcError::NotConnected)
149 }
150
151 /// Read response from server
152 pub async fn read_response(&mut self) -> Result<Vec<u8>> {
153 if !self.is_connected() {
154 return Err(HdcError::NotConnected);
155 }
156
157 let stream = self.stream.as_mut().unwrap();
158 let data = self.codec.read_packet(stream).await?;
159
160 Ok(data)
161 }
162
163 /// Read response as string
164 pub async fn read_response_string(&mut self) -> Result<String> {
165 let data = self.read_response().await?;
166
167 if data.is_empty() {
168 return Ok(String::new());
169 }
170
171 // Check if there's a command prefix (2 bytes)
172 if data.len() >= 2 {
173 let cmd_code = u16::from_le_bytes([data[0], data[1]]);
174 if let Some(cmd) = HdcCommand::from_u16(cmd_code) {
175 debug!("Response has command prefix: {:?}", cmd);
176 // Skip command bytes
177 return Ok(String::from_utf8(data[2..].to_vec())?);
178 }
179 }
180
181 Ok(String::from_utf8(data)?)
182 }
183
184 /// Execute a shell command and return output
185 ///
186 /// If a device has been selected via `connect_device()`, the command will be
187 /// executed on that device (the device ID is set in the channel's connectKey
188 /// during handshake). Otherwise, HDC server will return an error asking
189 /// to specify a device.
190 ///
191 /// Note: Each shell command uses up the current channel. After execution,
192 /// the connection is automatically re-established if a device was connected.
193 pub async fn shell(&mut self, cmd: &str) -> Result<String> {
194 info!("Executing shell command: {}", cmd);
195
196 // Save the current connect key before executing
197 let device_id = self.connect_key.clone();
198
199 // Command format is just "shell <cmd>"
200 // Device targeting is done via the connectKey in handshake, not via -t parameter
201 let full_cmd = format!("shell {}", cmd);
202
203 self.send_command(&full_cmd).await?;
204
205 // For shell commands, HDC server sends a single response packet with raw output data
206 // No command code prefix, just the plain output
207 let output = match timeout(Duration::from_secs(5), self.read_response()).await {
208 Ok(Ok(data)) => {
209 debug!("Shell response: {} bytes", data.len());
210 String::from_utf8_lossy(&data).to_string()
211 }
212 Ok(Err(e)) => {
213 debug!("Error reading shell response: {}", e);
214 return Err(e);
215 }
216 Err(_) => {
217 warn!("Timeout reading shell response");
218 return Err(HdcError::Timeout);
219 }
220 };
221
222 // Shell command consumes the channel - reconnect if we had a device
223 if let Some(device) = device_id {
224 debug!("Reconnecting to device after shell command");
225 if let Err(e) = self.connect_device(&device).await {
226 warn!("Failed to reconnect after shell: {}", e);
227 // Don't fail the shell command itself, just log the warning
228 }
229 }
230
231 Ok(output)
232 }
233
234 /// List connected devices/targets
235 pub async fn list_targets(&mut self) -> Result<Vec<String>> {
236 info!("Listing targets");
237
238 self.send_command("list targets").await?;
239
240 let response = self.read_response_string().await?;
241 debug!("List targets response: {}", response);
242
243 // Parse device list (format: one device per line)
244 let devices: Vec<String> = response
245 .lines()
246 .map(|line| line.trim())
247 .filter(|line| !line.is_empty())
248 .map(|line| line.to_string())
249 .collect();
250
251 info!("Found {} device(s)", devices.len());
252 Ok(devices)
253 }
254
255 // pub async fn get_device_stream(&self, device_id: &str) -> Result<HdcClient>{
256 // let stream = timeout(DEFAULT_TIMEOUT, TcpStream::connect(&self.address))
257 // .await
258 // .map_err(|_| HdcError::Timeout)?
259 // .map_err(HdcError::Io)?;
260 // let mut client = HdcClient{
261 // stream: Some(stream),
262 // address: self.address.clone(),
263 // codec: PacketCodec::new(),
264 // channel_id: 0,
265 // handshake_ok: false,
266 // connect_key: None,
267 // };
268 // client.perform_handshake(Some(device_id)).await?;
269 // Ok(client)
270 // }
271
272 /// Connect to a specific device
273 ///
274 /// This re-establishes the connection with the specified device ID in the handshake.
275 /// After calling this, all commands will be executed on the specified device.
276 pub async fn connect_device(&mut self, device_id: &str) -> Result<()> {
277 info!("Connecting to device: {}", device_id);
278
279 // Close existing connection
280 if self.stream.is_some() {
281 debug!("Closing existing connection");
282 self.stream = None;
283 self.handshake_ok = false;
284 }
285
286 // Reconnect with new device ID
287 let stream = timeout(DEFAULT_TIMEOUT, TcpStream::connect(&self.address))
288 .await
289 .map_err(|_| HdcError::Timeout)?
290 .map_err(HdcError::Io)?;
291
292 self.stream = Some(stream);
293
294 // Perform handshake with connect key
295 self.perform_handshake(Some(device_id)).await?;
296 self.connect_key = Some(device_id.to_string());
297
298 Ok(())
299 }
300
301 /// Check server version
302 pub async fn check_server(&mut self) -> Result<String> {
303 info!("Checking server version");
304
305 self.send_command("checkserver").await?;
306 let response = self.read_response_string().await?;
307
308 debug!("Server version: {}", response);
309 Ok(response)
310 }
311
312 /// Execute a command on a specific device
313 ///
314 /// This is a convenience method that:
315 /// 1. Connects to the specified device (re-handshake with connectKey)
316 /// 2. Executes the command
317 ///
318 /// Note: This changes the client's current device setting.
319 pub async fn target_command(&mut self, device_id: &str, cmd: &str) -> Result<String> {
320 info!("Executing target command on {}: {}", device_id, cmd);
321
322 // Connect to device first (sets connectKey in handshake)
323 self.connect_device(device_id).await?;
324
325 // Send command directly
326 self.send_command(cmd).await?;
327 let output = self.read_response_string().await?;
328 Ok(output)
329 }
330
331 /// Execute a shell command on a specific device (convenience method)
332 ///
333 /// This connects to the device and executes: `shell <cmd>`
334 pub async fn shell_on_device(&mut self, device_id: &str, cmd: &str) -> Result<String> {
335 // Connect to device first
336 self.connect_device(device_id).await?;
337
338 // Execute shell command
339 self.shell(cmd).await
340 }
341
342 /// Close the connection
343 pub async fn close(&mut self) -> Result<()> {
344 if let Some(stream) = self.stream.take() {
345 info!("Closing connection");
346 drop(stream);
347 self.handshake_ok = false;
348 }
349 Ok(())
350 }
351
352 // ========== Forward Commands ==========
353
354 /// Create a port forward (fport)
355 ///
356 /// Forward local traffic to remote device.
357 ///
358 /// # Example
359 /// ```no_run
360 /// # use hdc_rs::{HdcClient, ForwardNode};
361 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
362 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
363 /// // Forward local TCP 8080 to device TCP 8081
364 /// client.fport(ForwardNode::Tcp(8080), ForwardNode::Tcp(8081)).await?;
365 /// # Ok(())
366 /// # }
367 /// ```
368 pub async fn fport(
369 &mut self,
370 local: crate::forward::ForwardNode,
371 remote: crate::forward::ForwardNode,
372 ) -> Result<String> {
373 info!(
374 "Creating forward: {} -> {}",
375 local.as_protocol_string(),
376 remote.as_protocol_string()
377 );
378
379 let cmd = format!(
380 "fport {} {}",
381 local.as_protocol_string(),
382 remote.as_protocol_string()
383 );
384 self.send_command(&cmd).await?;
385
386 let response = self.read_response_string().await?;
387 debug!("Forward response: {}", response);
388 Ok(response)
389 }
390
391 /// Create a reverse port forward (rport)
392 ///
393 /// Reserve remote traffic to local host.
394 ///
395 /// # Example
396 /// ```no_run
397 /// # use hdc_rs::{HdcClient, ForwardNode};
398 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
399 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
400 /// // Forward device TCP 8080 to local TCP 8081
401 /// client.rport(ForwardNode::Tcp(8080), ForwardNode::Tcp(8081)).await?;
402 /// # Ok(())
403 /// # }
404 /// ```
405 pub async fn rport(
406 &mut self,
407 remote: crate::forward::ForwardNode,
408 local: crate::forward::ForwardNode,
409 ) -> Result<String> {
410 info!(
411 "Creating reverse forward: {} -> {}",
412 remote.as_protocol_string(),
413 local.as_protocol_string()
414 );
415
416 let cmd = format!(
417 "rport {} {}",
418 remote.as_protocol_string(),
419 local.as_protocol_string()
420 );
421 self.send_command(&cmd).await?;
422
423 let response = self.read_response_string().await?;
424 debug!("Reverse forward response: {}", response);
425 Ok(response)
426 }
427
428 /// List all forward/reverse tasks
429 ///
430 /// Note: This command does not require a device connection.
431 /// It lists forwards across all devices.
432 pub async fn fport_list(&mut self) -> Result<Vec<String>> {
433 info!("Listing forward tasks");
434
435 // fport ls doesn't need connectKey, use a temporary connection
436 let mut temp_client = Self::new(&self.address);
437 temp_client.connect_internal().await?;
438
439 temp_client.send_command("fport ls").await?;
440 let response = temp_client.read_response_string().await?;
441 debug!("Forward list response: {}", response);
442
443 // Check for error messages
444 if response.starts_with("[Fail]") {
445 return Err(HdcError::Protocol(response));
446 }
447
448 // Parse the response - each line is a forward task
449 let tasks: Vec<String> = response
450 .lines()
451 .map(|line| line.trim())
452 .filter(|line| !line.is_empty())
453 .map(|line| line.to_string())
454 .collect();
455
456 Ok(tasks)
457 }
458
459 /// Remove a forward/reverse task by task string
460 ///
461 /// Note: This command does not require a device connection.
462 ///
463 /// # Example
464 /// ```no_run
465 /// # use hdc_rs::HdcClient;
466 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
467 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
468 /// client.fport_remove("tcp:8080 tcp:8081").await?;
469 /// # Ok(())
470 /// # }
471 /// ```
472 pub async fn fport_remove(&mut self, task_str: &str) -> Result<String> {
473 info!("Removing forward task: {}", task_str);
474
475 // fport rm doesn't need connectKey, use a temporary connection
476 let mut temp_client = Self::new(&self.address);
477 temp_client.connect_internal().await?;
478
479 let cmd = format!("fport rm {}", task_str);
480 temp_client.send_command(&cmd).await?;
481
482 let response = temp_client.read_response_string().await?;
483 debug!("Remove forward response: {}", response);
484
485 // Check for error messages
486 if response.starts_with("[Fail]") {
487 return Err(HdcError::Protocol(response));
488 }
489
490 Ok(response)
491 }
492
493 // ========== App Commands ==========
494
495 /// Install application package(s) to device
496 ///
497 /// # Arguments
498 /// * `paths` - Single or multiple package paths (.hap, .hsp) or directories
499 /// * `options` - Install options (replace, shared)
500 ///
501 /// # Example
502 /// ```no_run
503 /// # use hdc_rs::{HdcClient, InstallOptions};
504 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
505 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
506 /// let opts = InstallOptions::new().replace(true);
507 /// client.install(&["app.hap"], opts).await?;
508 /// # Ok(())
509 /// # }
510 /// ```
511 pub async fn install(
512 &mut self,
513 paths: &[&str],
514 options: crate::app::InstallOptions,
515 ) -> Result<String> {
516 info!("Installing app: {:?} with options: {:?}", paths, options);
517
518 let flags = options.to_flags();
519 let paths_str = paths.join(" ");
520
521 let cmd = if flags.is_empty() {
522 format!("install {}", paths_str)
523 } else {
524 format!("install {} {}", flags, paths_str)
525 };
526
527 self.send_command(&cmd).await?;
528
529 // Install may take time and send multiple responses
530 let mut output = String::new();
531 loop {
532 match timeout(Duration::from_secs(30), self.read_response_string()).await {
533 Ok(Ok(resp)) => {
534 if resp.is_empty() {
535 break;
536 }
537 output.push_str(&resp);
538
539 // Check if installation completed
540 if resp.contains("Success")
541 || resp.contains("success")
542 || resp.contains("Fail")
543 || resp.contains("fail")
544 {
545 break;
546 }
547 }
548 Ok(Err(e)) => return Err(e),
549 Err(_) => {
550 warn!("Timeout waiting for install response");
551 break;
552 }
553 }
554 }
555
556 debug!("Install output: {} bytes", output.len());
557 Ok(output)
558 }
559
560 /// Uninstall application package from device
561 ///
562 /// # Arguments
563 /// * `package` - Package name to uninstall
564 /// * `options` - Uninstall options (keep_data, shared)
565 ///
566 /// # Example
567 /// ```no_run
568 /// # use hdc_rs::{HdcClient, UninstallOptions};
569 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
570 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
571 /// let opts = UninstallOptions::new().keep_data(true);
572 /// client.uninstall("com.example.app", opts).await?;
573 /// # Ok(())
574 /// # }
575 /// ```
576 pub async fn uninstall(
577 &mut self,
578 package: &str,
579 options: crate::app::UninstallOptions,
580 ) -> Result<String> {
581 info!("Uninstalling app: {} with options: {:?}", package, options);
582
583 let flags = options.to_flags();
584
585 let cmd = if flags.is_empty() {
586 format!("uninstall {}", package)
587 } else {
588 format!("uninstall {} {}", flags, package)
589 };
590
591 self.send_command(&cmd).await?;
592
593 let response = self.read_response_string().await?;
594 debug!("Uninstall response: {}", response);
595 Ok(response)
596 }
597
598 /// Display device logs using hilog
599 ///
600 /// This method streams logs from the device. The log stream will continue until
601 /// the connection is closed or an error occurs.
602 ///
603 /// # Arguments
604 /// * `args` - Optional arguments for hilog command (e.g., "-h" for help, "-t app" for app logs)
605 ///
606 /// # Example
607 /// ```no_run
608 /// # use hdc_rs::HdcClient;
609 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
610 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
611 /// # client.connect_device("device_id").await?;
612 /// // Display all logs
613 /// let logs = client.hilog(None).await?;
614 /// println!("{}", logs);
615 ///
616 /// // Display only app logs
617 /// let app_logs = client.hilog(Some("-t app")).await?;
618 /// println!("{}", app_logs);
619 /// # Ok(())
620 /// # }
621 /// ```
622 pub async fn hilog(&mut self, args: Option<&str>) -> Result<String> {
623 info!("Reading hilog: {:?}", args);
624
625 let cmd = if let Some(args) = args {
626 format!("hilog {}", args)
627 } else {
628 "hilog".to_string()
629 };
630
631 self.send_command(&cmd).await?;
632
633 let mut output = String::new();
634
635 // Read log stream with extended timeout
636 // Hilog streams continuously, we read for a reasonable amount of time
637 loop {
638 match timeout(Duration::from_secs(5), self.read_response_string()).await {
639 Ok(Ok(resp)) => {
640 if resp.is_empty() {
641 break;
642 }
643 output.push_str(&resp);
644
645 // For continuous log streaming, check if user wants to stop
646 // In practice, you might want to use a callback or channel here
647 // to allow real-time log streaming instead of buffering
648 }
649 Ok(Err(e)) => return Err(e),
650 Err(_) => {
651 // Timeout - check if we got any data
652 if output.is_empty() {
653 warn!("Timeout waiting for hilog response");
654 return Err(HdcError::Timeout);
655 }
656 // Otherwise, this might just be the end of the log stream
657 break;
658 }
659 }
660 }
661
662 debug!("Hilog output: {} bytes", output.len());
663 Ok(output)
664 }
665
666 /// Stream hilog output continuously with a callback
667 ///
668 /// This method streams logs from the device and calls the provided callback
669 /// for each log chunk received. The stream continues until an error occurs
670 /// or the callback returns false.
671 ///
672 /// # Arguments
673 /// * `args` - Optional arguments for hilog command
674 /// * `callback` - Function to call for each log chunk. Return false to stop streaming.
675 ///
676 /// # Example
677 /// ```no_run
678 /// # use hdc_rs::HdcClient;
679 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
680 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
681 /// # client.connect_device("device_id").await?;
682 /// client.hilog_stream(None, |log_chunk| {
683 /// print!("{}", log_chunk);
684 /// true // Continue streaming
685 /// }).await?;
686 /// # Ok(())
687 /// # }
688 /// ```
689 pub async fn hilog_stream<F>(&mut self, args: Option<&str>, mut callback: F) -> Result<()>
690 where
691 F: FnMut(&str) -> bool,
692 {
693 info!("Starting hilog stream: {:?}", args);
694
695 let cmd = if let Some(args) = args {
696 format!("hilog {}", args)
697 } else {
698 "hilog".to_string()
699 };
700
701 self.send_command(&cmd).await?;
702
703 // Stream logs continuously
704 loop {
705 match timeout(Duration::from_secs(30), self.read_response_string()).await {
706 Ok(Ok(resp)) => {
707 if resp.is_empty() {
708 break;
709 }
710
711 // Call user callback with log chunk
712 if !callback(&resp) {
713 info!("Hilog stream stopped by callback");
714 break;
715 }
716 }
717 Ok(Err(e)) => {
718 warn!("Error reading hilog stream: {:?}", e);
719 return Err(e);
720 }
721 Err(_) => {
722 warn!("Timeout reading hilog stream");
723 break;
724 }
725 }
726 }
727
728 Ok(())
729 }
730
731 /// Wait for any device to connect
732 ///
733 /// This command blocks until at least one device is connected.
734 /// If a device is already connected, it returns immediately.
735 ///
736 /// # Example
737 /// ```no_run
738 /// # use hdc_rs::HdcClient;
739 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
740 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
741 /// // Wait for any device
742 /// let device = client.wait_for_device().await?;
743 /// println!("Device connected: {}", device);
744 /// # Ok(())
745 /// # }
746 /// ```
747 pub async fn wait_for_device(&mut self) -> Result<String> {
748 info!("Waiting for device...");
749
750 self.send_command("wait").await?;
751
752 let response = self.read_response_string().await?;
753 debug!("Wait for device response: {}", response);
754
755 // Response format: "Wait for connected target is <device_id>"
756 if let Some(device_id) = response.split("is ").nth(1) {
757 Ok(device_id.trim().to_string())
758 } else {
759 // Fallback: just return the whole response
760 Ok(response.trim().to_string())
761 }
762 }
763
764 /// Monitor device list changes with a callback
765 ///
766 /// This function continuously polls the device list and calls the callback
767 /// when changes are detected. The polling interval can be configured.
768 ///
769 /// Note: HDC doesn't have a native "track-devices" command like adb,
770 /// so this implementation uses polling to detect changes. Each poll creates
771 /// a new connection to ensure reliability.
772 ///
773 /// # Arguments
774 /// * `interval` - Polling interval (recommended: 1-3 seconds)
775 /// * `callback` - Function called when device list changes. Return false to stop monitoring.
776 ///
777 /// # Example
778 /// ```no_run
779 /// # use hdc_rs::HdcClient;
780 /// # use std::time::Duration;
781 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
782 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
783 /// client.monitor_devices(Duration::from_secs(2), |devices| {
784 /// println!("Device list changed:");
785 /// for device in devices {
786 /// println!(" - {}", device);
787 /// }
788 /// true // Continue monitoring
789 /// }).await?;
790 /// # Ok(())
791 /// # }
792 /// ```
793 pub async fn monitor_devices<F>(&mut self, interval: Duration, mut callback: F) -> Result<()>
794 where
795 F: FnMut(&[String]) -> bool,
796 {
797 info!("Starting device monitoring with interval: {:?}", interval);
798
799 let mut previous_devices: Vec<String> = Vec::new();
800
801 loop {
802 // Reconnect for each poll to ensure fresh connection
803 // HDC server closes connection after each request
804 if let Err(e) = self.connect_internal().await {
805 warn!("Failed to reconnect during monitoring: {:?}", e);
806 tokio::time::sleep(interval).await;
807 continue;
808 }
809
810 // Get current device list
811 match self.list_targets().await {
812 Ok(devices) => {
813 // Check if device list has changed
814 if devices != previous_devices {
815 debug!(
816 "Device list changed: {:?} -> {:?}",
817 previous_devices, devices
818 );
819
820 // Call user callback
821 if !callback(&devices) {
822 info!("Device monitoring stopped by callback");
823 break;
824 }
825
826 previous_devices = devices;
827 }
828 }
829 Err(e) => {
830 warn!("Error listing devices during monitoring: {:?}", e);
831 // Continue monitoring even if there's an error
832 }
833 }
834
835 // Wait before next poll
836 tokio::time::sleep(interval).await;
837 }
838
839 Ok(())
840 }
841
842 /// Send file to device
843 ///
844 /// Transfer a file from local path to remote device path.
845 ///
846 /// # Arguments
847 /// * `local_path` - Local file path to send
848 /// * `remote_path` - Remote device path destination
849 /// * `options` - File transfer options (timestamp, sync, compress, etc.)
850 ///
851 /// # Example
852 /// ```no_run
853 /// # use hdc_rs::{HdcClient, FileTransferOptions};
854 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
855 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
856 /// # client.connect_device("device_id").await?;
857 /// let opts = FileTransferOptions::new()
858 /// .hold_timestamp(true)
859 /// .compress(true);
860 /// client.file_send("test.txt", "/data/local/tmp/test.txt", opts).await?;
861 /// # Ok(())
862 /// # }
863 /// ```
864 pub async fn file_send(
865 &mut self,
866 local_path: &str,
867 remote_path: &str,
868 options: crate::file::FileTransferOptions,
869 ) -> Result<String> {
870 info!("Sending file: {} -> {}", local_path, remote_path);
871
872 // Validate paths
873 if !crate::file::validate_path(local_path) || !crate::file::validate_path(remote_path) {
874 return Err(HdcError::Protocol("Invalid file path".to_string()));
875 }
876
877 // Build command
878 let flags = options.to_flags();
879 let cmd = if flags.is_empty() {
880 format!("file send {} {}", local_path, remote_path)
881 } else {
882 format!("file send {} {} {}", flags, local_path, remote_path)
883 };
884
885 info!("File send command: {}", cmd);
886 self.send_command(&cmd).await?;
887
888 // Read transfer responses
889 let mut output = String::new();
890 loop {
891 match timeout(Duration::from_secs(60), self.read_response_string()).await {
892 Ok(Ok(resp)) => {
893 if resp.is_empty() {
894 break;
895 }
896 output.push_str(&resp);
897
898 // Check for completion indicators
899 if resp.contains("FileTransfer finish")
900 || resp.contains("Transfer finish")
901 || resp.contains("[Fail]")
902 || resp.contains("fail")
903 {
904 break;
905 }
906 }
907 Ok(Err(e)) => return Err(e),
908 Err(_) => {
909 warn!("Timeout during file transfer");
910 if output.is_empty() {
911 return Err(HdcError::Timeout);
912 }
913 break;
914 }
915 }
916 }
917
918 debug!("File send output: {} bytes", output.len());
919 Ok(output)
920 }
921
922 /// Receive file from device
923 ///
924 /// Transfer a file from remote device path to local path.
925 ///
926 /// # Arguments
927 /// * `remote_path` - Remote device file path to receive
928 /// * `local_path` - Local destination path
929 /// * `options` - File transfer options (timestamp, sync, compress, etc.)
930 ///
931 /// # Example
932 /// ```no_run
933 /// # use hdc_rs::{HdcClient, FileTransferOptions};
934 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
935 /// # let mut client = HdcClient::connect("127.0.0.1:8710").await?;
936 /// # client.connect_device("device_id").await?;
937 /// let opts = FileTransferOptions::new().hold_timestamp(true);
938 /// client.file_recv("/data/local/tmp/test.txt", "test.txt", opts).await?;
939 /// # Ok(())
940 /// # }
941 /// ```
942 pub async fn file_recv(
943 &mut self,
944 remote_path: &str,
945 local_path: &str,
946 options: crate::file::FileTransferOptions,
947 ) -> Result<String> {
948 info!("Receiving file: {} -> {}", remote_path, local_path);
949
950 // Validate paths
951 if !crate::file::validate_path(local_path) || !crate::file::validate_path(remote_path) {
952 return Err(HdcError::Protocol("Invalid file path".to_string()));
953 }
954
955 // Build command
956 let flags = options.to_flags();
957 let cmd = if flags.is_empty() {
958 format!("file recv {} {}", remote_path, local_path)
959 } else {
960 format!("file recv {} {} {}", flags, remote_path, local_path)
961 };
962
963 info!("File recv command: {}", cmd);
964 self.send_command(&cmd).await?;
965
966 // Read transfer responses
967 let mut output = String::new();
968 loop {
969 match timeout(Duration::from_secs(60), self.read_response_string()).await {
970 Ok(Ok(resp)) => {
971 if resp.is_empty() {
972 break;
973 }
974 output.push_str(&resp);
975
976 // Check for completion indicators
977 if resp.contains("FileTransfer finish")
978 || resp.contains("Transfer finish")
979 || resp.contains("[Fail]")
980 || resp.contains("fail")
981 {
982 break;
983 }
984 }
985 Ok(Err(e)) => return Err(e),
986 Err(_) => {
987 warn!("Timeout during file transfer");
988 if output.is_empty() {
989 return Err(HdcError::Timeout);
990 }
991 break;
992 }
993 }
994 }
995
996 debug!("File recv output: {} bytes", output.len());
997 Ok(output)
998 }
999}
1000
1001impl Drop for HdcClient {
1002 fn drop(&mut self) {
1003 if self.stream.is_some() {
1004 debug!("HdcClient dropped, connection will be closed");
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use super::*;
1012
1013 #[test]
1014 fn test_client_creation() {
1015 let client = HdcClient::new("127.0.0.1:8710");
1016 assert_eq!(client.address, "127.0.0.1:8710");
1017 assert!(!client.is_connected());
1018 }
1019}