circuitpython_deploy/
board.rs1use crate::error::{CpdError, Result};
2use std::path::{Path, PathBuf};
3use sysinfo::Disks;
4
5#[derive(Debug, Clone)]
6pub struct CircuitPythonBoard {
7 pub path: PathBuf,
8 pub name: String,
9 pub volume_label: Option<String>,
10 pub total_space: u64,
11 pub available_space: u64,
12}
13
14impl CircuitPythonBoard {
15 pub fn new(path: PathBuf, name: String, volume_label: Option<String>, total_space: u64, available_space: u64) -> Self {
16 Self {
17 path,
18 name,
19 volume_label,
20 total_space,
21 available_space,
22 }
23 }
24
25 pub fn display_name(&self) -> String {
26 match &self.volume_label {
27 Some(label) => format!("{} ({})", self.name, label),
28 None => self.name.clone(),
29 }
30 }
31
32 pub fn format_space(&self) -> String {
33 format!(
34 "{} / {} available",
35 format_bytes(self.available_space),
36 format_bytes(self.total_space)
37 )
38 }
39}
40
41pub struct BoardDetector {
42 verbose: bool,
43}
44
45impl BoardDetector {
46 pub fn new(verbose: bool) -> Self {
47 Self { verbose }
48 }
49
50 pub fn detect_boards(&self) -> Result<Vec<CircuitPythonBoard>> {
52 let mut boards = Vec::new();
53 let disks = Disks::new_with_refreshed_list();
54
55 for disk in &disks {
56 let mount_point = disk.mount_point();
57
58 if self.verbose {
59 println!("Checking disk: {} at {}", disk.name().to_string_lossy(), mount_point.display());
60 }
61
62 if self.is_circuitpython_board(mount_point) {
63 let volume_label = self.get_volume_label(mount_point);
64 let board = CircuitPythonBoard::new(
65 mount_point.to_path_buf(),
66 disk.name().to_string_lossy().to_string(),
67 volume_label,
68 disk.total_space(),
69 disk.available_space(),
70 );
71
72 if self.verbose {
73 println!("Found CircuitPython board: {}", board.display_name());
74 }
75
76 boards.push(board);
77 }
78 }
79
80 Ok(boards)
81 }
82
83 pub fn is_circuitpython_board(&self, path: &Path) -> bool {
85 if !path.exists() || !path.is_dir() {
86 return false;
87 }
88
89 let volume_label = self.get_volume_label(path);
91 let has_circuitpy_label = volume_label
92 .as_ref()
93 .map(|label| label.to_uppercase().contains("CIRCUITPY"))
94 .unwrap_or(false);
95
96 if has_circuitpy_label {
97 return true;
98 }
99
100 let optional_files = ["code.py", "main.py", "lib"];
102
103 let has_boot_out = path.join("boot_out.txt").exists();
105 if !has_boot_out {
106 return false;
107 }
108
109 let mut found_optional = 0;
111 for file in &optional_files {
112 if path.join(file).exists() {
113 found_optional += 1;
114 }
115 }
116
117 if let Ok(content) = std::fs::read_to_string(path.join("boot_out.txt")) {
119 let content_lower = content.to_lowercase();
121 if content_lower.contains("circuitpython") || content_lower.contains("adafruit") {
122 return true;
123 }
124 }
125
126 let cp_indicators = [
128 "CIRCUITPY.USB_VID",
129 "CIRCUITPY.USB_PID",
130 "settings.toml",
131 ".fseventsd", ];
133
134 let mut found_cp_indicators = 0;
135 for indicator in &cp_indicators {
136 if path.join(indicator).exists() {
137 found_cp_indicators += 1;
138 }
139 }
140
141 has_boot_out && (found_optional >= 1 || found_cp_indicators >= 1)
145 }
146
147 fn get_volume_label(&self, path: &Path) -> Option<String> {
149 #[cfg(windows)]
153 {
154 self.get_windows_volume_label(path)
155 }
156
157 #[cfg(unix)]
158 {
159 self.get_unix_volume_label(path)
160 }
161 }
162
163 #[cfg(windows)]
164 fn get_windows_volume_label(&self, path: &Path) -> Option<String> {
165 use std::ffi::OsStr;
166 use std::os::windows::ffi::OsStrExt;
167 use winapi::um::fileapi::GetVolumeInformationW;
168
169 let root_path = if let Some(root) = path.components().next() {
171 let mut root_str = root.as_os_str().to_string_lossy().to_string();
172 if !root_str.ends_with('\\') {
173 root_str.push('\\');
174 }
175 root_str
176 } else {
177 return None;
178 };
179
180 let path_wide: Vec<u16> = OsStr::new(&root_path)
181 .encode_wide()
182 .chain(std::iter::once(0))
183 .collect();
184
185 let mut volume_name = [0u16; 256];
186 let mut file_system_name = [0u16; 256];
187 let mut volume_serial_number = 0;
188 let mut maximum_component_length = 0;
189 let mut file_system_flags = 0;
190
191 unsafe {
192 let result = GetVolumeInformationW(
193 path_wide.as_ptr(),
194 volume_name.as_mut_ptr(),
195 volume_name.len() as u32,
196 &mut volume_serial_number,
197 &mut maximum_component_length,
198 &mut file_system_flags,
199 file_system_name.as_mut_ptr(),
200 file_system_name.len() as u32,
201 );
202
203 if result != 0 {
204 let len = volume_name.iter().position(|&x| x == 0).unwrap_or(volume_name.len());
205 if len > 0 {
206 let label = String::from_utf16(&volume_name[..len]).ok()?;
207 if !label.trim().is_empty() {
208 return Some(label.trim().to_string());
209 }
210 }
211 }
212 }
213
214 self.get_volume_label_from_sysinfo(path)
216 }
217
218 #[cfg(unix)]
219 fn get_unix_volume_label(&self, path: &Path) -> Option<String> {
220 if let Some(label) = self.get_unix_label_from_mounts(path) {
224 return Some(label);
225 }
226
227 if let Some(label) = self.get_unix_label_from_blkid(path) {
229 return Some(label);
230 }
231
232 self.get_volume_label_from_sysinfo(path)
234 }
235
236 #[cfg(unix)]
237 fn get_unix_label_from_mounts(&self, path: &Path) -> Option<String> {
238 use std::fs;
239
240 if let Ok(mounts) = fs::read_to_string("/proc/mounts") {
242 for line in mounts.lines() {
243 let parts: Vec<&str> = line.split_whitespace().collect();
244 if parts.len() >= 2 {
245 let mount_point = parts[1];
246 if path.starts_with(mount_point) {
247 let device = parts[0];
249 if device.contains("CIRCUITPY") {
250 return Some("CIRCUITPY".to_string());
251 }
252 }
253 }
254 }
255 }
256 None
257 }
258
259 #[cfg(unix)]
260 fn get_unix_label_from_blkid(&self, _path: &Path) -> Option<String> {
261 None
264 }
265
266 fn get_volume_label_from_sysinfo(&self, path: &Path) -> Option<String> {
268 let disks = Disks::new_with_refreshed_list();
269
270 for disk in &disks {
271 if disk.mount_point() == path {
272 let name = disk.name().to_string_lossy();
273 if !name.is_empty() && name != "Unknown" {
274 return Some(name.to_string());
275 }
276 }
277 }
278 None
279 }
280
281 pub fn select_board<'a>(&self, boards: &'a [CircuitPythonBoard]) -> Result<&'a CircuitPythonBoard> {
283 if boards.is_empty() {
284 return Err(CpdError::BoardNotFound);
285 }
286
287 if boards.len() == 1 {
288 return Ok(&boards[0]);
289 }
290
291 println!("Multiple CircuitPython boards detected:");
292 for (i, board) in boards.iter().enumerate() {
293 println!(" {}: {} at {} ({})",
294 i + 1,
295 board.display_name(),
296 board.path.display(),
297 board.format_space()
298 );
299 }
300
301 println!("Please select a board (1-{}):", boards.len());
302
303 use std::io::{self, Write};
304 loop {
305 print!("> ");
306 io::stdout().flush().unwrap();
307
308 let mut input = String::new();
309 io::stdin().read_line(&mut input).unwrap();
310
311 match input.trim().parse::<usize>() {
312 Ok(choice) if choice >= 1 && choice <= boards.len() => {
313 return Ok(&boards[choice - 1]);
314 }
315 _ => {
316 println!("Invalid selection. Please enter a number between 1 and {}.", boards.len());
317 }
318 }
319 }
320 }
321
322 pub fn list_boards(&self) -> Result<()> {
324 let boards = self.detect_boards()?;
325
326 if boards.is_empty() {
327 println!("No CircuitPython boards detected.");
328 println!("\nTroubleshooting:");
329 println!(" - Ensure your CircuitPython board is connected via USB");
330 println!(" - Check that the board appears as a removable drive");
331 println!(" - Try pressing the RESET button on your board");
332 return Ok(());
333 }
334
335 println!("Detected CircuitPython boards:");
336 for board in &boards {
337 println!(" • {} at {}", board.display_name(), board.path.display());
338 println!(" Space: {}", board.format_space());
339
340 if let Ok(entries) = std::fs::read_dir(&board.path) {
342 let file_count = entries.count();
343 println!(" Files: {} items", file_count);
344 }
345
346 println!();
347 }
348
349 Ok(())
350 }
351}
352
353fn format_bytes(bytes: u64) -> String {
354 const UNITS: &[&str] = &["B", "KB", "MB", "GB"];
355 let mut size = bytes as f64;
356 let mut unit_index = 0;
357
358 while size >= 1024.0 && unit_index < UNITS.len() - 1 {
359 size /= 1024.0;
360 unit_index += 1;
361 }
362
363 if unit_index == 0 {
364 format!("{} {}", bytes, UNITS[unit_index])
365 } else {
366 format!("{:.1} {}", size, UNITS[unit_index])
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373 use tempfile::TempDir;
374 use std::fs;
375
376 #[test]
377 fn test_is_circuitpython_board() {
378 let temp_dir = TempDir::new().unwrap();
379 let detector = BoardDetector::new(false);
380
381 assert!(!detector.is_circuitpython_board(temp_dir.path()));
383
384 fs::write(temp_dir.path().join("boot_out.txt"), "CircuitPython test").unwrap();
386 fs::write(temp_dir.path().join("code.py"), "print('hello')").unwrap();
387
388 assert!(detector.is_circuitpython_board(temp_dir.path()));
389 }
390
391 #[test]
392 fn test_format_bytes() {
393 assert_eq!(format_bytes(0), "0 B");
394 assert_eq!(format_bytes(512), "512 B");
395 assert_eq!(format_bytes(1024), "1.0 KB");
396 assert_eq!(format_bytes(1536), "1.5 KB");
397 assert_eq!(format_bytes(1048576), "1.0 MB");
398 }
399}