edgefirst_client/
client.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright © 2025 Au-Zone Technologies. All Rights Reserved.
3
4use crate::{
5    Annotation, Error, Sample, Task,
6    api::{
7        AnnotationSetID, Artifact, DatasetID, Experiment, ExperimentID, LoginResult, Organization,
8        Project, ProjectID, SamplesCountResult, SamplesListParams, SamplesListResult, Snapshot,
9        SnapshotID, SnapshotRestore, SnapshotRestoreResult, Stage, TaskID, TaskInfo, TaskStages,
10        TaskStatus, TasksListParams, TasksListResult, TrainingSession, TrainingSessionID,
11        ValidationSession, ValidationSessionID,
12    },
13    dataset::{AnnotationSet, AnnotationType, Dataset, FileType, Label, NewLabel, NewLabelObject},
14    retry::{create_retry_policy, log_retry_configuration},
15};
16use base64::Engine as _;
17use chrono::{DateTime, Utc};
18use directories::ProjectDirs;
19use futures::{StreamExt as _, future::join_all};
20use log::{Level, debug, error, log_enabled, trace, warn};
21use reqwest::{Body, header::CONTENT_LENGTH, multipart::Form};
22use serde::{Deserialize, Serialize, de::DeserializeOwned};
23use std::{
24    collections::HashMap,
25    ffi::OsStr,
26    fs::create_dir_all,
27    io::{SeekFrom, Write as _},
28    path::{Path, PathBuf},
29    sync::{
30        Arc,
31        atomic::{AtomicUsize, Ordering},
32    },
33    time::Duration,
34    vec,
35};
36use tokio::{
37    fs::{self, File},
38    io::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _},
39    sync::{RwLock, Semaphore, mpsc::Sender},
40};
41use tokio_util::codec::{BytesCodec, FramedRead};
42use walkdir::WalkDir;
43
44#[cfg(feature = "polars")]
45use polars::prelude::*;
46
47static PART_SIZE: usize = 100 * 1024 * 1024;
48
49fn max_tasks() -> usize {
50    std::env::var("MAX_TASKS")
51        .ok()
52        .and_then(|v| v.parse().ok())
53        .unwrap_or_else(|| {
54            // Default to half the number of CPUs, minimum 2, maximum 8
55            // Lower max prevents timeout issues with large file uploads
56            let cpus = std::thread::available_parallelism()
57                .map(|n| n.get())
58                .unwrap_or(4);
59            (cpus / 2).clamp(2, 8)
60        })
61}
62
63fn sanitize_path_component(name: &str) -> String {
64    let trimmed = name.trim();
65    if trimmed.is_empty() {
66        return "unnamed".to_string();
67    }
68
69    let component = Path::new(trimmed)
70        .file_name()
71        .unwrap_or_else(|| OsStr::new(trimmed));
72
73    let sanitized: String = component
74        .to_string_lossy()
75        .chars()
76        .map(|c| match c {
77            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
78            _ => c,
79        })
80        .collect();
81
82    if sanitized.is_empty() {
83        "unnamed".to_string()
84    } else {
85        sanitized
86    }
87}
88
89/// Progress information for long-running operations.
90///
91/// This struct tracks the current progress of operations like file uploads,
92/// downloads, or dataset processing. It provides the current count and total
93/// count to enable progress reporting in applications.
94///
95/// # Examples
96///
97/// ```rust
98/// use edgefirst_client::Progress;
99///
100/// let progress = Progress {
101///     current: 25,
102///     total: 100,
103/// };
104/// let percentage = (progress.current as f64 / progress.total as f64) * 100.0;
105/// println!(
106///     "Progress: {:.1}% ({}/{})",
107///     percentage, progress.current, progress.total
108/// );
109/// ```
110#[derive(Debug, Clone)]
111pub struct Progress {
112    /// Current number of completed items.
113    pub current: usize,
114    /// Total number of items to process.
115    pub total: usize,
116}
117
118#[derive(Serialize)]
119struct RpcRequest<Params> {
120    id: u64,
121    jsonrpc: String,
122    method: String,
123    params: Option<Params>,
124}
125
126impl<T> Default for RpcRequest<T> {
127    fn default() -> Self {
128        RpcRequest {
129            id: 0,
130            jsonrpc: "2.0".to_string(),
131            method: "".to_string(),
132            params: None,
133        }
134    }
135}
136
137#[derive(Deserialize)]
138struct RpcError {
139    code: i32,
140    message: String,
141}
142
143#[derive(Deserialize)]
144struct RpcResponse<RpcResult> {
145    #[allow(dead_code)]
146    id: String,
147    #[allow(dead_code)]
148    jsonrpc: String,
149    error: Option<RpcError>,
150    result: Option<RpcResult>,
151}
152
153#[derive(Deserialize)]
154#[allow(dead_code)]
155struct EmptyResult {}
156
157#[derive(Debug, Serialize)]
158#[allow(dead_code)]
159struct SnapshotCreateParams {
160    snapshot_name: String,
161    keys: Vec<String>,
162}
163
164#[derive(Debug, Deserialize)]
165#[allow(dead_code)]
166struct SnapshotCreateResult {
167    snapshot_id: SnapshotID,
168    urls: Vec<String>,
169}
170
171#[derive(Debug, Serialize)]
172struct SnapshotCreateMultipartParams {
173    snapshot_name: String,
174    keys: Vec<String>,
175    file_sizes: Vec<usize>,
176}
177
178#[derive(Debug, Deserialize)]
179#[serde(untagged)]
180enum SnapshotCreateMultipartResultField {
181    Id(u64),
182    Part(SnapshotPart),
183}
184
185#[derive(Debug, Serialize)]
186struct SnapshotCompleteMultipartParams {
187    key: String,
188    upload_id: String,
189    etag_list: Vec<EtagPart>,
190}
191
192#[derive(Debug, Clone, Serialize)]
193struct EtagPart {
194    #[serde(rename = "ETag")]
195    etag: String,
196    #[serde(rename = "PartNumber")]
197    part_number: usize,
198}
199
200#[derive(Debug, Clone, Deserialize)]
201struct SnapshotPart {
202    key: Option<String>,
203    upload_id: String,
204    urls: Vec<String>,
205}
206
207#[derive(Debug, Serialize)]
208struct SnapshotStatusParams {
209    snapshot_id: SnapshotID,
210    status: String,
211}
212
213#[derive(Deserialize, Debug)]
214struct SnapshotStatusResult {
215    #[allow(dead_code)]
216    pub id: SnapshotID,
217    #[allow(dead_code)]
218    pub uid: String,
219    #[allow(dead_code)]
220    pub description: String,
221    #[allow(dead_code)]
222    pub date: String,
223    #[allow(dead_code)]
224    pub status: String,
225}
226
227#[derive(Serialize)]
228#[allow(dead_code)]
229struct ImageListParams {
230    images_filter: ImagesFilter,
231    image_files_filter: HashMap<String, String>,
232    only_ids: bool,
233}
234
235#[derive(Serialize)]
236#[allow(dead_code)]
237struct ImagesFilter {
238    dataset_id: DatasetID,
239}
240
241/// Main client for interacting with EdgeFirst Studio Server.
242///
243/// The EdgeFirst Client handles the connection to the EdgeFirst Studio Server
244/// and manages authentication, RPC calls, and data operations. It provides
245/// methods for managing projects, datasets, experiments, training sessions,
246/// and various utility functions for data processing.
247///
248/// The client supports multiple authentication methods and can work with both
249/// SaaS and self-hosted EdgeFirst Studio instances.
250///
251/// # Features
252///
253/// - **Authentication**: Token-based authentication with automatic persistence
254/// - **Dataset Management**: Upload, download, and manipulate datasets
255/// - **Project Operations**: Create and manage projects and experiments
256/// - **Training & Validation**: Submit and monitor ML training jobs
257/// - **Data Integration**: Convert between EdgeFirst datasets and popular
258///   formats
259/// - **Progress Tracking**: Real-time progress updates for long-running
260///   operations
261///
262/// # Examples
263///
264/// ```no_run
265/// use edgefirst_client::{Client, DatasetID};
266/// use std::str::FromStr;
267///
268/// # async fn example() -> Result<(), edgefirst_client::Error> {
269/// // Create a new client and authenticate
270/// let mut client = Client::new()?;
271/// let client = client
272///     .with_login("your-email@example.com", "password")
273///     .await?;
274///
275/// // Or use an existing token
276/// let base_client = Client::new()?;
277/// let client = base_client.with_token("your-token-here")?;
278///
279/// // Get organization and projects
280/// let org = client.organization().await?;
281/// let projects = client.projects(None).await?;
282///
283/// // Work with datasets
284/// let dataset_id = DatasetID::from_str("ds-abc123")?;
285/// let dataset = client.dataset(dataset_id).await?;
286/// # Ok(())
287/// # }
288/// ```
289#[derive(Clone, Debug)]
290pub struct Client {
291    http: reqwest::Client,
292    url: String,
293    token: Arc<RwLock<String>>,
294    token_path: Option<PathBuf>,
295}
296
297/// Private context struct for pagination operations
298struct FetchContext<'a> {
299    dataset_id: DatasetID,
300    annotation_set_id: Option<AnnotationSetID>,
301    groups: &'a [String],
302    types: Vec<String>,
303    labels: &'a HashMap<String, u64>,
304}
305
306impl Client {
307    /// Create a new unauthenticated client with the default saas server.  To
308    /// connect to a different server use the `with_server` method or with the
309    /// `with_token` method to create a client with a token which includes the
310    /// server instance name (test, stage, saas).
311    ///
312    /// This client is created without a token and will need to login before
313    /// using any methods that require authentication.  Use the `with_token`
314    /// method to create a client with a token.
315    pub fn new() -> Result<Self, Error> {
316        log_retry_configuration();
317
318        // Get timeout from environment or use default
319        let timeout_secs = std::env::var("EDGEFIRST_TIMEOUT")
320            .ok()
321            .and_then(|s| s.parse().ok())
322            .unwrap_or(30); // Default 30s timeout for API calls
323
324        // Create single HTTP client with URL-based retry policy
325        //
326        // The retry policy classifies requests into two categories:
327        // - StudioApi (*.edgefirst.studio/api): Fast-fail on auth errors, retry server
328        //   errors
329        // - FileIO (S3, CloudFront, etc.): Retry all transient errors for robustness
330        //
331        // This allows the same client to handle both API calls and file operations
332        // with appropriate retry behavior for each. See retry.rs for details.
333        let http = reqwest::Client::builder()
334            .connect_timeout(Duration::from_secs(10))
335            .timeout(Duration::from_secs(timeout_secs))
336            .pool_idle_timeout(Duration::from_secs(90))
337            .pool_max_idle_per_host(10)
338            .retry(create_retry_policy())
339            .build()?;
340
341        Ok(Client {
342            http,
343            url: "https://edgefirst.studio".to_string(),
344            token: Arc::new(tokio::sync::RwLock::new("".to_string())),
345            token_path: None,
346        })
347    }
348
349    /// Returns a new client connected to the specified server instance.  If a
350    /// token is already set in the client then it will be dropped as the token
351    /// is specific to the server instance.
352    pub fn with_server(&self, server: &str) -> Result<Self, Error> {
353        Ok(Client {
354            url: format!("https://{}.edgefirst.studio", server),
355            ..self.clone()
356        })
357    }
358
359    /// Returns a new client authenticated with the provided username and
360    /// password.
361    pub async fn with_login(&self, username: &str, password: &str) -> Result<Self, Error> {
362        let params = HashMap::from([("username", username), ("password", password)]);
363        let login: LoginResult = self
364            .rpc_without_auth("auth.login".to_owned(), Some(params))
365            .await?;
366
367        // Validate that the server returned a non-empty token
368        if login.token.is_empty() {
369            return Err(Error::EmptyToken);
370        }
371
372        Ok(Client {
373            token: Arc::new(tokio::sync::RwLock::new(login.token)),
374            ..self.clone()
375        })
376    }
377
378    /// Returns a new client which will load and save the token to the specified
379    /// path.
380    pub fn with_token_path(&self, token_path: Option<&Path>) -> Result<Self, Error> {
381        let token_path = match token_path {
382            Some(path) => path.to_path_buf(),
383            None => ProjectDirs::from("ai", "EdgeFirst", "EdgeFirst Studio")
384                .ok_or_else(|| {
385                    Error::IoError(std::io::Error::new(
386                        std::io::ErrorKind::NotFound,
387                        "Could not determine user config directory",
388                    ))
389                })?
390                .config_dir()
391                .join("token"),
392        };
393
394        debug!("Using token path: {:?}", token_path);
395
396        let token = match token_path.exists() {
397            true => std::fs::read_to_string(&token_path)?,
398            false => "".to_string(),
399        };
400
401        if !token.is_empty() {
402            match self.with_token(&token) {
403                Ok(client) => Ok(Client {
404                    token_path: Some(token_path),
405                    ..client
406                }),
407                Err(e) => {
408                    // Token is corrupted or invalid - remove it and continue with no token
409                    warn!(
410                        "Invalid or corrupted token file at {:?}: {:?}. Removing token file.",
411                        token_path, e
412                    );
413                    if let Err(remove_err) = std::fs::remove_file(&token_path) {
414                        warn!("Failed to remove corrupted token file: {:?}", remove_err);
415                    }
416                    Ok(Client {
417                        token_path: Some(token_path),
418                        ..self.clone()
419                    })
420                }
421            }
422        } else {
423            Ok(Client {
424                token_path: Some(token_path),
425                ..self.clone()
426            })
427        }
428    }
429
430    /// Returns a new client authenticated with the provided token.
431    pub fn with_token(&self, token: &str) -> Result<Self, Error> {
432        if token.is_empty() {
433            return Ok(self.clone());
434        }
435
436        let token_parts: Vec<&str> = token.split('.').collect();
437        if token_parts.len() != 3 {
438            return Err(Error::InvalidToken);
439        }
440
441        let decoded = base64::engine::general_purpose::STANDARD_NO_PAD
442            .decode(token_parts[1])
443            .map_err(|_| Error::InvalidToken)?;
444        let payload: HashMap<String, serde_json::Value> = serde_json::from_slice(&decoded)?;
445        let server = match payload.get("server") {
446            Some(value) => value.as_str().ok_or(Error::InvalidToken)?.to_string(),
447            None => return Err(Error::InvalidToken),
448        };
449
450        Ok(Client {
451            url: format!("https://{}.edgefirst.studio", server),
452            token: Arc::new(tokio::sync::RwLock::new(token.to_string())),
453            ..self.clone()
454        })
455    }
456
457    pub async fn save_token(&self) -> Result<(), Error> {
458        let path = self.token_path.clone().unwrap_or_else(|| {
459            ProjectDirs::from("ai", "EdgeFirst", "EdgeFirst Studio")
460                .map(|dirs| dirs.config_dir().join("token"))
461                .unwrap_or_else(|| PathBuf::from(".token"))
462        });
463
464        create_dir_all(path.parent().ok_or_else(|| {
465            Error::IoError(std::io::Error::new(
466                std::io::ErrorKind::InvalidInput,
467                "Token path has no parent directory",
468            ))
469        })?)?;
470        let mut file = std::fs::File::create(&path)?;
471        file.write_all(self.token.read().await.as_bytes())?;
472
473        debug!("Saved token to {:?}", path);
474
475        Ok(())
476    }
477
478    /// Return the version of the EdgeFirst Studio server for the current
479    /// client connection.
480    pub async fn version(&self) -> Result<String, Error> {
481        let version: HashMap<String, String> = self
482            .rpc_without_auth::<(), HashMap<String, String>>("version".to_owned(), None)
483            .await?;
484        let version = version.get("version").ok_or(Error::InvalidResponse)?;
485        Ok(version.to_owned())
486    }
487
488    /// Clear the token used to authenticate the client with the server.  If an
489    /// optional path was provided when creating the client, the token file
490    /// will also be cleared.
491    pub async fn logout(&self) -> Result<(), Error> {
492        {
493            let mut token = self.token.write().await;
494            *token = "".to_string();
495        }
496
497        if let Some(path) = &self.token_path
498            && path.exists()
499        {
500            fs::remove_file(path).await?;
501        }
502
503        Ok(())
504    }
505
506    /// Return the token used to authenticate the client with the server.  When
507    /// logging into the server using a username and password, the token is
508    /// returned by the server and stored in the client for future interactions.
509    pub async fn token(&self) -> String {
510        self.token.read().await.clone()
511    }
512
513    /// Verify the token used to authenticate the client with the server.  This
514    /// method is used to ensure that the token is still valid and has not
515    /// expired.  If the token is invalid, the server will return an error and
516    /// the client will need to login again.
517    pub async fn verify_token(&self) -> Result<(), Error> {
518        self.rpc::<(), LoginResult>("auth.verify_token".to_owned(), None)
519            .await?;
520        Ok::<(), Error>(())
521    }
522
523    /// Renew the token used to authenticate the client with the server.  This
524    /// method is used to refresh the token before it expires.  If the token
525    /// has already expired, the server will return an error and the client
526    /// will need to login again.
527    pub async fn renew_token(&self) -> Result<(), Error> {
528        let params = HashMap::from([("username".to_string(), self.username().await?)]);
529        let result: LoginResult = self
530            .rpc_without_auth("auth.refresh".to_owned(), Some(params))
531            .await?;
532
533        {
534            let mut token = self.token.write().await;
535            *token = result.token;
536        }
537
538        if self.token_path.is_some() {
539            self.save_token().await?;
540        }
541
542        Ok(())
543    }
544
545    async fn token_field(&self, field: &str) -> Result<serde_json::Value, Error> {
546        let token = self.token.read().await;
547        if token.is_empty() {
548            return Err(Error::EmptyToken);
549        }
550
551        let token_parts: Vec<&str> = token.split('.').collect();
552        if token_parts.len() != 3 {
553            return Err(Error::InvalidToken);
554        }
555
556        let decoded = base64::engine::general_purpose::STANDARD_NO_PAD
557            .decode(token_parts[1])
558            .map_err(|_| Error::InvalidToken)?;
559        let payload: HashMap<String, serde_json::Value> = serde_json::from_slice(&decoded)?;
560        match payload.get(field) {
561            Some(value) => Ok(value.to_owned()),
562            None => Err(Error::InvalidToken),
563        }
564    }
565
566    /// Returns the URL of the EdgeFirst Studio server for the current client.
567    pub fn url(&self) -> &str {
568        &self.url
569    }
570
571    /// Returns the username associated with the current token.
572    pub async fn username(&self) -> Result<String, Error> {
573        match self.token_field("username").await? {
574            serde_json::Value::String(username) => Ok(username),
575            _ => Err(Error::InvalidToken),
576        }
577    }
578
579    /// Returns the expiration time for the current token.
580    pub async fn token_expiration(&self) -> Result<DateTime<Utc>, Error> {
581        let ts = match self.token_field("exp").await? {
582            serde_json::Value::Number(exp) => exp.as_i64().ok_or(Error::InvalidToken)?,
583            _ => return Err(Error::InvalidToken),
584        };
585
586        match DateTime::<Utc>::from_timestamp_secs(ts) {
587            Some(dt) => Ok(dt),
588            None => Err(Error::InvalidToken),
589        }
590    }
591
592    /// Returns the organization information for the current user.
593    pub async fn organization(&self) -> Result<Organization, Error> {
594        self.rpc::<(), Organization>("org.get".to_owned(), None)
595            .await
596    }
597
598    /// Returns a list of projects available to the user.  The projects are
599    /// returned as a vector of Project objects.  If a name filter is
600    /// provided, only projects matching the filter are returned.
601    ///
602    /// Projects are the top-level organizational unit in EdgeFirst Studio.
603    /// Projects contain datasets, trainers, and trainer sessions.  Projects
604    /// are used to group related datasets and trainers together.
605    pub async fn projects(&self, name: Option<&str>) -> Result<Vec<Project>, Error> {
606        let projects = self
607            .rpc::<(), Vec<Project>>("project.list".to_owned(), None)
608            .await?;
609        if let Some(name) = name {
610            Ok(projects
611                .into_iter()
612                .filter(|p| p.name().contains(name))
613                .collect())
614        } else {
615            Ok(projects)
616        }
617    }
618
619    /// Return the project with the specified project ID.  If the project does
620    /// not exist, an error is returned.
621    pub async fn project(&self, project_id: ProjectID) -> Result<Project, Error> {
622        let params = HashMap::from([("project_id", project_id)]);
623        self.rpc("project.get".to_owned(), Some(params)).await
624    }
625
626    /// Returns a list of datasets available to the user.  The datasets are
627    /// returned as a vector of Dataset objects.  If a name filter is
628    /// provided, only datasets matching the filter are returned.
629    pub async fn datasets(
630        &self,
631        project_id: ProjectID,
632        name: Option<&str>,
633    ) -> Result<Vec<Dataset>, Error> {
634        let params = HashMap::from([("project_id", project_id)]);
635        let datasets: Vec<Dataset> = self.rpc("dataset.list".to_owned(), Some(params)).await?;
636        if let Some(name) = name {
637            Ok(datasets
638                .into_iter()
639                .filter(|d| d.name().contains(name))
640                .collect())
641        } else {
642            Ok(datasets)
643        }
644    }
645
646    /// Return the dataset with the specified dataset ID.  If the dataset does
647    /// not exist, an error is returned.
648    pub async fn dataset(&self, dataset_id: DatasetID) -> Result<Dataset, Error> {
649        let params = HashMap::from([("dataset_id", dataset_id)]);
650        self.rpc("dataset.get".to_owned(), Some(params)).await
651    }
652
653    /// Lists the labels for the specified dataset.
654    pub async fn labels(&self, dataset_id: DatasetID) -> Result<Vec<Label>, Error> {
655        let params = HashMap::from([("dataset_id", dataset_id)]);
656        self.rpc("label.list".to_owned(), Some(params)).await
657    }
658
659    /// Add a new label to the dataset with the specified name.
660    pub async fn add_label(&self, dataset_id: DatasetID, name: &str) -> Result<(), Error> {
661        let new_label = NewLabel {
662            dataset_id,
663            labels: vec![NewLabelObject {
664                name: name.to_owned(),
665            }],
666        };
667        let _: String = self.rpc("label.add2".to_owned(), Some(new_label)).await?;
668        Ok(())
669    }
670
671    /// Removes the label with the specified ID from the dataset.  Label IDs are
672    /// globally unique so the dataset_id is not required.
673    pub async fn remove_label(&self, label_id: u64) -> Result<(), Error> {
674        let params = HashMap::from([("label_id", label_id)]);
675        let _: String = self.rpc("label.del".to_owned(), Some(params)).await?;
676        Ok(())
677    }
678
679    /// Creates a new dataset in the specified project.
680    ///
681    /// # Arguments
682    ///
683    /// * `project_id` - The ID of the project to create the dataset in
684    /// * `name` - The name of the new dataset
685    /// * `description` - Optional description for the dataset
686    ///
687    /// # Returns
688    ///
689    /// Returns the dataset ID of the newly created dataset.
690    pub async fn create_dataset(
691        &self,
692        project_id: &str,
693        name: &str,
694        description: Option<&str>,
695    ) -> Result<DatasetID, Error> {
696        let mut params = HashMap::new();
697        params.insert("project_id", project_id);
698        params.insert("name", name);
699        if let Some(desc) = description {
700            params.insert("description", desc);
701        }
702
703        #[derive(Deserialize)]
704        struct CreateDatasetResult {
705            id: DatasetID,
706        }
707
708        let result: CreateDatasetResult =
709            self.rpc("dataset.create".to_owned(), Some(params)).await?;
710        Ok(result.id)
711    }
712
713    /// Deletes a dataset by marking it as deleted.
714    ///
715    /// # Arguments
716    ///
717    /// * `dataset_id` - The ID of the dataset to delete
718    ///
719    /// # Returns
720    ///
721    /// Returns `Ok(())` if the dataset was successfully marked as deleted.
722    pub async fn delete_dataset(&self, dataset_id: DatasetID) -> Result<(), Error> {
723        let params = HashMap::from([("id", dataset_id)]);
724        let _: String = self.rpc("dataset.delete".to_owned(), Some(params)).await?;
725        Ok(())
726    }
727
728    /// Updates the label with the specified ID to have the new name or index.
729    /// Label IDs cannot be changed.  Label IDs are globally unique so the
730    /// dataset_id is not required.
731    pub async fn update_label(&self, label: &Label) -> Result<(), Error> {
732        #[derive(Serialize)]
733        struct Params {
734            dataset_id: DatasetID,
735            label_id: u64,
736            label_name: String,
737            label_index: u64,
738        }
739
740        let _: String = self
741            .rpc(
742                "label.update".to_owned(),
743                Some(Params {
744                    dataset_id: label.dataset_id(),
745                    label_id: label.id(),
746                    label_name: label.name().to_owned(),
747                    label_index: label.index(),
748                }),
749            )
750            .await?;
751        Ok(())
752    }
753
754    /// Downloads dataset samples to the local filesystem.
755    ///
756    /// # Arguments
757    ///
758    /// * `dataset_id` - The unique identifier of the dataset
759    /// * `groups` - Dataset groups to include (e.g., "train", "val")
760    /// * `file_types` - File types to download (e.g., Image, LidarPcd)
761    /// * `output` - Local directory to save downloaded files
762    /// * `flatten` - If true, download all files to output root without
763    ///   sequence subdirectories. When flattening, filenames are automatically
764    ///   prefixed with `{sequence_name}_{frame}_` if not already present to
765    ///   avoid conflicts between sequences.
766    /// * `progress` - Optional channel for progress updates
767    ///
768    /// # Returns
769    ///
770    /// Returns `Ok(())` on success or an error if download fails.
771    ///
772    /// # Example
773    ///
774    /// ```rust,no_run
775    /// # use edgefirst_client::{Client, DatasetID, FileType};
776    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
777    /// let client = Client::new()?.with_token_path(None)?;
778    /// let dataset_id: DatasetID = "ds-123".try_into()?;
779    ///
780    /// // Download with sequence subdirectories (default)
781    /// client
782    ///     .download_dataset(
783    ///         dataset_id,
784    ///         &[],
785    ///         &[FileType::Image],
786    ///         "./data".into(),
787    ///         false,
788    ///         None,
789    ///     )
790    ///     .await?;
791    ///
792    /// // Download flattened (all files in one directory)
793    /// client
794    ///     .download_dataset(
795    ///         dataset_id,
796    ///         &[],
797    ///         &[FileType::Image],
798    ///         "./data".into(),
799    ///         true,
800    ///         None,
801    ///     )
802    ///     .await?;
803    /// # Ok(())
804    /// # }
805    /// ```
806    pub async fn download_dataset(
807        &self,
808        dataset_id: DatasetID,
809        groups: &[String],
810        file_types: &[FileType],
811        output: PathBuf,
812        flatten: bool,
813        progress: Option<Sender<Progress>>,
814    ) -> Result<(), Error> {
815        let samples = self
816            .samples(dataset_id, None, &[], groups, file_types, progress.clone())
817            .await?;
818        fs::create_dir_all(&output).await?;
819
820        let client = self.clone();
821        let file_types = file_types.to_vec();
822        let output = output.clone();
823
824        parallel_foreach_items(samples, progress, move |sample| {
825            let client = client.clone();
826            let file_types = file_types.clone();
827            let output = output.clone();
828
829            async move {
830                for file_type in file_types {
831                    if let Some(data) = sample.download(&client, file_type.clone()).await? {
832                        let (file_ext, is_image) = match file_type.clone() {
833                            FileType::Image => (
834                                infer::get(&data)
835                                    .expect("Failed to identify image file format for sample")
836                                    .extension()
837                                    .to_string(),
838                                true,
839                            ),
840                            other => (other.to_string(), false),
841                        };
842
843                        // Determine target directory based on sequence membership and flatten
844                        // option
845                        // - flatten=false + sequence_name: dataset/sequence_name/
846                        // - flatten=false + no sequence: dataset/ (root level)
847                        // - flatten=true: dataset/ (all files in output root)
848                        // NOTE: group (train/val/test) is NOT used for directory structure
849                        let sequence_dir = sample
850                            .sequence_name()
851                            .map(|name| sanitize_path_component(name));
852
853                        let target_dir = if flatten {
854                            output.clone()
855                        } else {
856                            sequence_dir
857                                .as_ref()
858                                .map(|seq| output.join(seq))
859                                .unwrap_or_else(|| output.clone())
860                        };
861                        fs::create_dir_all(&target_dir).await?;
862
863                        let sanitized_sample_name = sample
864                            .name()
865                            .map(|name| sanitize_path_component(&name))
866                            .unwrap_or_else(|| "unknown".to_string());
867
868                        let image_name = sample.image_name().map(sanitize_path_component);
869
870                        // Construct filename with smart prefixing for flatten mode
871                        // When flatten=true and sample belongs to a sequence:
872                        //   - Check if filename already starts with "{sequence_name}_"
873                        //   - If not, prepend "{sequence_name}_{frame}_" to avoid conflicts
874                        //   - If yes, use filename as-is (already uniquely named)
875                        let file_name = if is_image {
876                            if let Some(img_name) = image_name {
877                                Self::build_filename(
878                                    &img_name,
879                                    flatten,
880                                    sequence_dir.as_ref(),
881                                    sample.frame_number(),
882                                )
883                            } else {
884                                format!("{}.{}", sanitized_sample_name, file_ext)
885                            }
886                        } else {
887                            let base_name = format!("{}.{}", sanitized_sample_name, file_ext);
888                            Self::build_filename(
889                                &base_name,
890                                flatten,
891                                sequence_dir.as_ref(),
892                                sample.frame_number(),
893                            )
894                        };
895
896                        let file_path = target_dir.join(&file_name);
897
898                        let mut file = File::create(&file_path).await?;
899                        file.write_all(&data).await?;
900                    } else {
901                        warn!(
902                            "No data for sample: {}",
903                            sample
904                                .id()
905                                .map(|id| id.to_string())
906                                .unwrap_or_else(|| "unknown".to_string())
907                        );
908                    }
909                }
910
911                Ok(())
912            }
913        })
914        .await
915    }
916
917    /// Builds a filename with smart prefixing for flatten mode.
918    ///
919    /// When flattening sequences into a single directory, this function ensures
920    /// unique filenames by checking if the sequence prefix already exists and
921    /// adding it if necessary.
922    ///
923    /// # Logic
924    ///
925    /// - If `flatten=false`: returns `base_name` unchanged
926    /// - If `flatten=true` and no sequence: returns `base_name` unchanged
927    /// - If `flatten=true` and in sequence:
928    ///   - Already prefixed with `{sequence_name}_`: returns `base_name`
929    ///     unchanged
930    ///   - Not prefixed: returns `{sequence_name}_{frame}_{base_name}` or
931    ///     `{sequence_name}_{base_name}`
932    fn build_filename(
933        base_name: &str,
934        flatten: bool,
935        sequence_name: Option<&String>,
936        frame_number: Option<u32>,
937    ) -> String {
938        if !flatten || sequence_name.is_none() {
939            return base_name.to_string();
940        }
941
942        let seq_name = sequence_name.unwrap();
943        let prefix = format!("{}_", seq_name);
944
945        // Check if already prefixed with sequence name
946        if base_name.starts_with(&prefix) {
947            base_name.to_string()
948        } else {
949            // Add sequence (and optionally frame) prefix
950            match frame_number {
951                Some(frame) => format!("{}{}_{}", prefix, frame, base_name),
952                None => format!("{}{}", prefix, base_name),
953            }
954        }
955    }
956
957    /// List available annotation sets for the specified dataset.
958    pub async fn annotation_sets(
959        &self,
960        dataset_id: DatasetID,
961    ) -> Result<Vec<AnnotationSet>, Error> {
962        let params = HashMap::from([("dataset_id", dataset_id)]);
963        self.rpc("annset.list".to_owned(), Some(params)).await
964    }
965
966    /// Create a new annotation set for the specified dataset.
967    ///
968    /// # Arguments
969    ///
970    /// * `dataset_id` - The ID of the dataset to create the annotation set in
971    /// * `name` - The name of the new annotation set
972    /// * `description` - Optional description for the annotation set
973    ///
974    /// # Returns
975    ///
976    /// Returns the annotation set ID of the newly created annotation set.
977    pub async fn create_annotation_set(
978        &self,
979        dataset_id: DatasetID,
980        name: &str,
981        description: Option<&str>,
982    ) -> Result<AnnotationSetID, Error> {
983        #[derive(Serialize)]
984        struct Params<'a> {
985            dataset_id: DatasetID,
986            name: &'a str,
987            operator: &'a str,
988            #[serde(skip_serializing_if = "Option::is_none")]
989            description: Option<&'a str>,
990        }
991
992        #[derive(Deserialize)]
993        struct CreateAnnotationSetResult {
994            id: AnnotationSetID,
995        }
996
997        let username = self.username().await?;
998        let result: CreateAnnotationSetResult = self
999            .rpc(
1000                "annset.add".to_owned(),
1001                Some(Params {
1002                    dataset_id,
1003                    name,
1004                    operator: &username,
1005                    description,
1006                }),
1007            )
1008            .await?;
1009        Ok(result.id)
1010    }
1011
1012    /// Deletes an annotation set by marking it as deleted.
1013    ///
1014    /// # Arguments
1015    ///
1016    /// * `annotation_set_id` - The ID of the annotation set to delete
1017    ///
1018    /// # Returns
1019    ///
1020    /// Returns `Ok(())` if the annotation set was successfully marked as
1021    /// deleted.
1022    pub async fn delete_annotation_set(
1023        &self,
1024        annotation_set_id: AnnotationSetID,
1025    ) -> Result<(), Error> {
1026        let params = HashMap::from([("id", annotation_set_id)]);
1027        let _: String = self.rpc("annset.delete".to_owned(), Some(params)).await?;
1028        Ok(())
1029    }
1030
1031    /// Retrieve the annotation set with the specified ID.
1032    pub async fn annotation_set(
1033        &self,
1034        annotation_set_id: AnnotationSetID,
1035    ) -> Result<AnnotationSet, Error> {
1036        let params = HashMap::from([("annotation_set_id", annotation_set_id)]);
1037        self.rpc("annset.get".to_owned(), Some(params)).await
1038    }
1039
1040    /// Get the annotations for the specified annotation set with the
1041    /// requested annotation types.  The annotation types are used to filter
1042    /// the annotations returned.  The groups parameter is used to filter for
1043    /// dataset groups (train, val, test).  Images which do not have any
1044    /// annotations are also included in the result as long as they are in the
1045    /// requested groups (when specified).
1046    ///
1047    /// The result is a vector of Annotations objects which contain the
1048    /// full dataset along with the annotations for the specified types.
1049    ///
1050    /// To get the annotations as a DataFrame, use the `annotations_dataframe`
1051    /// method instead.
1052    pub async fn annotations(
1053        &self,
1054        annotation_set_id: AnnotationSetID,
1055        groups: &[String],
1056        annotation_types: &[AnnotationType],
1057        progress: Option<Sender<Progress>>,
1058    ) -> Result<Vec<Annotation>, Error> {
1059        let dataset_id = self.annotation_set(annotation_set_id).await?.dataset_id();
1060        let labels = self
1061            .labels(dataset_id)
1062            .await?
1063            .into_iter()
1064            .map(|label| (label.name().to_string(), label.index()))
1065            .collect::<HashMap<_, _>>();
1066        let total = self
1067            .samples_count(
1068                dataset_id,
1069                Some(annotation_set_id),
1070                annotation_types,
1071                groups,
1072                &[],
1073            )
1074            .await?
1075            .total as usize;
1076
1077        if total == 0 {
1078            return Ok(vec![]);
1079        }
1080
1081        let context = FetchContext {
1082            dataset_id,
1083            annotation_set_id: Some(annotation_set_id),
1084            groups,
1085            types: annotation_types.iter().map(|t| t.to_string()).collect(),
1086            labels: &labels,
1087        };
1088
1089        self.fetch_annotations_paginated(context, total, progress)
1090            .await
1091    }
1092
1093    async fn fetch_annotations_paginated(
1094        &self,
1095        context: FetchContext<'_>,
1096        total: usize,
1097        progress: Option<Sender<Progress>>,
1098    ) -> Result<Vec<Annotation>, Error> {
1099        let mut annotations = vec![];
1100        let mut continue_token: Option<String> = None;
1101        let mut current = 0;
1102
1103        loop {
1104            let params = SamplesListParams {
1105                dataset_id: context.dataset_id,
1106                annotation_set_id: context.annotation_set_id,
1107                types: context.types.clone(),
1108                group_names: context.groups.to_vec(),
1109                continue_token,
1110            };
1111
1112            let result: SamplesListResult =
1113                self.rpc("samples.list".to_owned(), Some(params)).await?;
1114            current += result.samples.len();
1115            continue_token = result.continue_token;
1116
1117            if result.samples.is_empty() {
1118                break;
1119            }
1120
1121            self.process_sample_annotations(&result.samples, context.labels, &mut annotations);
1122
1123            if let Some(progress) = &progress {
1124                let _ = progress.send(Progress { current, total }).await;
1125            }
1126
1127            match &continue_token {
1128                Some(token) if !token.is_empty() => continue,
1129                _ => break,
1130            }
1131        }
1132
1133        drop(progress);
1134        Ok(annotations)
1135    }
1136
1137    fn process_sample_annotations(
1138        &self,
1139        samples: &[Sample],
1140        labels: &HashMap<String, u64>,
1141        annotations: &mut Vec<Annotation>,
1142    ) {
1143        for sample in samples {
1144            if sample.annotations().is_empty() {
1145                let mut annotation = Annotation::new();
1146                annotation.set_sample_id(sample.id());
1147                annotation.set_name(sample.name());
1148                annotation.set_sequence_name(sample.sequence_name().cloned());
1149                annotation.set_frame_number(sample.frame_number());
1150                annotation.set_group(sample.group().cloned());
1151                annotations.push(annotation);
1152                continue;
1153            }
1154
1155            for annotation in sample.annotations() {
1156                let mut annotation = annotation.clone();
1157                annotation.set_sample_id(sample.id());
1158                annotation.set_name(sample.name());
1159                annotation.set_sequence_name(sample.sequence_name().cloned());
1160                annotation.set_frame_number(sample.frame_number());
1161                annotation.set_group(sample.group().cloned());
1162                Self::set_label_index_from_map(&mut annotation, labels);
1163                annotations.push(annotation);
1164            }
1165        }
1166    }
1167
1168    /// Helper to parse frame number from image_name when sequence_name is
1169    /// present. This ensures frame_number is always derived from the image
1170    /// filename, not from the server's frame_number field (which may be
1171    /// inconsistent).
1172    ///
1173    /// Returns Some(frame_number) if sequence_name is present and frame can be
1174    /// parsed, otherwise None.
1175    fn parse_frame_from_image_name(
1176        image_name: Option<&String>,
1177        sequence_name: Option<&String>,
1178    ) -> Option<u32> {
1179        use std::path::Path;
1180
1181        let sequence = sequence_name?;
1182        let name = image_name?;
1183
1184        // Extract stem (remove extension)
1185        let stem = Path::new(name).file_stem().and_then(|s| s.to_str())?;
1186
1187        // Parse frame from format: "sequence_XXX" where XXX is the frame number
1188        stem.strip_prefix(sequence)
1189            .and_then(|suffix| suffix.strip_prefix('_'))
1190            .and_then(|frame_str| frame_str.parse::<u32>().ok())
1191    }
1192
1193    /// Helper to set label index from a label map
1194    fn set_label_index_from_map(annotation: &mut Annotation, labels: &HashMap<String, u64>) {
1195        if let Some(label) = annotation.label() {
1196            annotation.set_label_index(Some(labels[label.as_str()]));
1197        }
1198    }
1199
1200    pub async fn samples_count(
1201        &self,
1202        dataset_id: DatasetID,
1203        annotation_set_id: Option<AnnotationSetID>,
1204        annotation_types: &[AnnotationType],
1205        groups: &[String],
1206        types: &[FileType],
1207    ) -> Result<SamplesCountResult, Error> {
1208        let types = annotation_types
1209            .iter()
1210            .map(|t| t.to_string())
1211            .chain(types.iter().map(|t| t.to_string()))
1212            .collect::<Vec<_>>();
1213
1214        let params = SamplesListParams {
1215            dataset_id,
1216            annotation_set_id,
1217            group_names: groups.to_vec(),
1218            types,
1219            continue_token: None,
1220        };
1221
1222        self.rpc("samples.count".to_owned(), Some(params)).await
1223    }
1224
1225    pub async fn samples(
1226        &self,
1227        dataset_id: DatasetID,
1228        annotation_set_id: Option<AnnotationSetID>,
1229        annotation_types: &[AnnotationType],
1230        groups: &[String],
1231        types: &[FileType],
1232        progress: Option<Sender<Progress>>,
1233    ) -> Result<Vec<Sample>, Error> {
1234        let types_vec = annotation_types
1235            .iter()
1236            .map(|t| t.to_string())
1237            .chain(types.iter().map(|t| t.to_string()))
1238            .collect::<Vec<_>>();
1239        let labels = self
1240            .labels(dataset_id)
1241            .await?
1242            .into_iter()
1243            .map(|label| (label.name().to_string(), label.index()))
1244            .collect::<HashMap<_, _>>();
1245        let total = self
1246            .samples_count(dataset_id, annotation_set_id, annotation_types, groups, &[])
1247            .await?
1248            .total as usize;
1249
1250        if total == 0 {
1251            return Ok(vec![]);
1252        }
1253
1254        let context = FetchContext {
1255            dataset_id,
1256            annotation_set_id,
1257            groups,
1258            types: types_vec,
1259            labels: &labels,
1260        };
1261
1262        self.fetch_samples_paginated(context, total, progress).await
1263    }
1264
1265    async fn fetch_samples_paginated(
1266        &self,
1267        context: FetchContext<'_>,
1268        total: usize,
1269        progress: Option<Sender<Progress>>,
1270    ) -> Result<Vec<Sample>, Error> {
1271        let mut samples = vec![];
1272        let mut continue_token: Option<String> = None;
1273        let mut current = 0;
1274
1275        loop {
1276            let params = SamplesListParams {
1277                dataset_id: context.dataset_id,
1278                annotation_set_id: context.annotation_set_id,
1279                types: context.types.clone(),
1280                group_names: context.groups.to_vec(),
1281                continue_token: continue_token.clone(),
1282            };
1283
1284            let result: SamplesListResult =
1285                self.rpc("samples.list".to_owned(), Some(params)).await?;
1286            current += result.samples.len();
1287            continue_token = result.continue_token;
1288
1289            if result.samples.is_empty() {
1290                break;
1291            }
1292
1293            samples.append(
1294                &mut result
1295                    .samples
1296                    .into_iter()
1297                    .map(|s| {
1298                        // Use server's frame_number if valid (>= 0 after deserialization)
1299                        // Otherwise parse from image_name as fallback
1300                        // This ensures we respect explicit frame_number from uploads
1301                        // while still handling legacy data that only has filename encoding
1302                        let frame_number = s.frame_number.or_else(|| {
1303                            Self::parse_frame_from_image_name(
1304                                s.image_name.as_ref(),
1305                                s.sequence_name.as_ref(),
1306                            )
1307                        });
1308
1309                        let mut anns = s.annotations().to_vec();
1310                        for ann in &mut anns {
1311                            // Set annotation fields from parent sample
1312                            ann.set_name(s.name());
1313                            ann.set_group(s.group().cloned());
1314                            ann.set_sequence_name(s.sequence_name().cloned());
1315                            ann.set_frame_number(frame_number);
1316                            Self::set_label_index_from_map(ann, context.labels);
1317                        }
1318                        s.with_annotations(anns).with_frame_number(frame_number)
1319                    })
1320                    .collect::<Vec<_>>(),
1321            );
1322
1323            if let Some(progress) = &progress {
1324                let _ = progress.send(Progress { current, total }).await;
1325            }
1326
1327            match &continue_token {
1328                Some(token) if !token.is_empty() => continue,
1329                _ => break,
1330            }
1331        }
1332
1333        drop(progress);
1334        Ok(samples)
1335    }
1336
1337    /// Populates (imports) samples into a dataset using the `samples.populate2`
1338    /// API.
1339    ///
1340    /// This method creates new samples in the specified dataset, optionally
1341    /// with annotations and sensor data files. For each sample, the `files`
1342    /// field is checked for local file paths. If a filename is a valid path
1343    /// to an existing file, the file will be automatically uploaded to S3
1344    /// using presigned URLs returned by the server. The filename in the
1345    /// request is replaced with the basename (path removed) before sending
1346    /// to the server.
1347    ///
1348    /// # Important Notes
1349    ///
1350    /// - **`annotation_set_id` is REQUIRED** when importing samples with
1351    ///   annotations. Without it, the server will accept the request but will
1352    ///   not save the annotation data. Use [`Client::annotation_sets`] to query
1353    ///   available annotation sets for a dataset, or create a new one via the
1354    ///   Studio UI.
1355    /// - **Box2d coordinates must be normalized** (0.0-1.0 range) for bounding
1356    ///   boxes. Divide pixel coordinates by image width/height before creating
1357    ///   [`Box2d`](crate::Box2d) annotations.
1358    /// - **Files are uploaded automatically** when the filename is a valid
1359    ///   local path. The method will replace the full path with just the
1360    ///   basename before sending to the server.
1361    /// - **Image dimensions are extracted automatically** for image files using
1362    ///   the `imagesize` crate. The width/height are sent to the server, but
1363    ///   note that the server currently doesn't return these fields when
1364    ///   fetching samples back.
1365    /// - **UUIDs are generated automatically** if not provided. If you need
1366    ///   deterministic UUIDs, set `sample.uuid` explicitly before calling. Note
1367    ///   that the server doesn't currently return UUIDs in sample queries.
1368    ///
1369    /// # Arguments
1370    ///
1371    /// * `dataset_id` - The ID of the dataset to populate
1372    /// * `annotation_set_id` - **Required** if samples contain annotations,
1373    ///   otherwise they will be ignored. Query with
1374    ///   [`Client::annotation_sets`].
1375    /// * `samples` - Vector of samples to import with metadata and file
1376    ///   references. For files, use the full local path - it will be uploaded
1377    ///   automatically. UUIDs and image dimensions will be
1378    ///   auto-generated/extracted if not provided.
1379    ///
1380    /// # Returns
1381    ///
1382    /// Returns the API result with sample UUIDs and upload status.
1383    ///
1384    /// # Example
1385    ///
1386    /// ```no_run
1387    /// use edgefirst_client::{Annotation, Box2d, Client, DatasetID, Sample, SampleFile};
1388    ///
1389    /// # async fn example() -> Result<(), edgefirst_client::Error> {
1390    /// # let client = Client::new()?.with_login("user", "pass").await?;
1391    /// # let dataset_id = DatasetID::from(1);
1392    /// // Query available annotation sets for the dataset
1393    /// let annotation_sets = client.annotation_sets(dataset_id).await?;
1394    /// let annotation_set_id = annotation_sets
1395    ///     .first()
1396    ///     .ok_or_else(|| {
1397    ///         edgefirst_client::Error::InvalidParameters("No annotation sets found".to_string())
1398    ///     })?
1399    ///     .id();
1400    ///
1401    /// // Create sample with annotation (UUID will be auto-generated)
1402    /// let mut sample = Sample::new();
1403    /// sample.width = Some(1920);
1404    /// sample.height = Some(1080);
1405    /// sample.group = Some("train".to_string());
1406    ///
1407    /// // Add file - use full path to local file, it will be uploaded automatically
1408    /// sample.files = vec![SampleFile::with_filename(
1409    ///     "image".to_string(),
1410    ///     "/path/to/image.jpg".to_string(),
1411    /// )];
1412    ///
1413    /// // Add bounding box annotation with NORMALIZED coordinates (0.0-1.0)
1414    /// let mut annotation = Annotation::new();
1415    /// annotation.set_label(Some("person".to_string()));
1416    /// // Normalize pixel coordinates by dividing by image dimensions
1417    /// let bbox = Box2d::new(0.5, 0.5, 0.25, 0.25); // (x, y, w, h) normalized
1418    /// annotation.set_box2d(Some(bbox));
1419    /// sample.annotations = vec![annotation];
1420    ///
1421    /// // Populate with annotation_set_id (REQUIRED for annotations)
1422    /// let result = client
1423    ///     .populate_samples(dataset_id, Some(annotation_set_id), vec![sample], None)
1424    ///     .await?;
1425    /// # Ok(())
1426    /// # }
1427    /// ```
1428    pub async fn populate_samples(
1429        &self,
1430        dataset_id: DatasetID,
1431        annotation_set_id: Option<AnnotationSetID>,
1432        samples: Vec<Sample>,
1433        progress: Option<Sender<Progress>>,
1434    ) -> Result<Vec<crate::SamplesPopulateResult>, Error> {
1435        use crate::api::SamplesPopulateParams;
1436
1437        // Track which files need to be uploaded
1438        let mut files_to_upload: Vec<(String, String, PathBuf, String)> = Vec::new();
1439
1440        // Process samples to detect local files and generate UUIDs
1441        let samples = self.prepare_samples_for_upload(samples, &mut files_to_upload)?;
1442
1443        let has_files_to_upload = !files_to_upload.is_empty();
1444
1445        // Call populate API with presigned_urls=true if we have files to upload
1446        let params = SamplesPopulateParams {
1447            dataset_id,
1448            annotation_set_id,
1449            presigned_urls: Some(has_files_to_upload),
1450            samples,
1451        };
1452
1453        let results: Vec<crate::SamplesPopulateResult> = self
1454            .rpc("samples.populate2".to_owned(), Some(params))
1455            .await?;
1456
1457        // Upload files if we have any
1458        if has_files_to_upload {
1459            self.upload_sample_files(&results, files_to_upload, progress)
1460                .await?;
1461        }
1462
1463        Ok(results)
1464    }
1465
1466    fn prepare_samples_for_upload(
1467        &self,
1468        samples: Vec<Sample>,
1469        files_to_upload: &mut Vec<(String, String, PathBuf, String)>,
1470    ) -> Result<Vec<Sample>, Error> {
1471        Ok(samples
1472            .into_iter()
1473            .map(|mut sample| {
1474                // Generate UUID if not provided
1475                if sample.uuid.is_none() {
1476                    sample.uuid = Some(uuid::Uuid::new_v4().to_string());
1477                }
1478
1479                let sample_uuid = sample.uuid.clone().expect("UUID just set above");
1480
1481                // Process files: detect local paths and queue for upload
1482                let files_copy = sample.files.clone();
1483                let updated_files: Vec<crate::SampleFile> = files_copy
1484                    .iter()
1485                    .map(|file| {
1486                        self.process_sample_file(file, &sample_uuid, &mut sample, files_to_upload)
1487                    })
1488                    .collect();
1489
1490                sample.files = updated_files;
1491                sample
1492            })
1493            .collect())
1494    }
1495
1496    fn process_sample_file(
1497        &self,
1498        file: &crate::SampleFile,
1499        sample_uuid: &str,
1500        sample: &mut Sample,
1501        files_to_upload: &mut Vec<(String, String, PathBuf, String)>,
1502    ) -> crate::SampleFile {
1503        use std::path::Path;
1504
1505        if let Some(filename) = file.filename() {
1506            let path = Path::new(filename);
1507
1508            // Check if this is a valid local file path
1509            if path.exists()
1510                && path.is_file()
1511                && let Some(basename) = path.file_name().and_then(|s| s.to_str())
1512            {
1513                // For image files, try to extract dimensions if not already set
1514                if file.file_type() == "image"
1515                    && (sample.width.is_none() || sample.height.is_none())
1516                    && let Ok(size) = imagesize::size(path)
1517                {
1518                    sample.width = Some(size.width as u32);
1519                    sample.height = Some(size.height as u32);
1520                }
1521
1522                // Store the full path for later upload
1523                files_to_upload.push((
1524                    sample_uuid.to_string(),
1525                    file.file_type().to_string(),
1526                    path.to_path_buf(),
1527                    basename.to_string(),
1528                ));
1529
1530                // Return SampleFile with just the basename
1531                return crate::SampleFile::with_filename(
1532                    file.file_type().to_string(),
1533                    basename.to_string(),
1534                );
1535            }
1536        }
1537        // Return the file unchanged if not a local path
1538        file.clone()
1539    }
1540
1541    async fn upload_sample_files(
1542        &self,
1543        results: &[crate::SamplesPopulateResult],
1544        files_to_upload: Vec<(String, String, PathBuf, String)>,
1545        progress: Option<Sender<Progress>>,
1546    ) -> Result<(), Error> {
1547        // Build a map from (sample_uuid, basename) -> local_path
1548        let mut upload_map: HashMap<(String, String), PathBuf> = HashMap::new();
1549        for (uuid, _file_type, path, basename) in files_to_upload {
1550            upload_map.insert((uuid, basename), path);
1551        }
1552
1553        let http = self.http.clone();
1554
1555        // Extract the data we need for parallel upload
1556        let upload_tasks: Vec<_> = results
1557            .iter()
1558            .map(|result| (result.uuid.clone(), result.urls.clone()))
1559            .collect();
1560
1561        parallel_foreach_items(upload_tasks, progress.clone(), move |(uuid, urls)| {
1562            let http = http.clone();
1563            let upload_map = upload_map.clone();
1564
1565            async move {
1566                // Upload all files for this sample
1567                for url_info in &urls {
1568                    if let Some(local_path) =
1569                        upload_map.get(&(uuid.clone(), url_info.filename.clone()))
1570                    {
1571                        // Upload the file
1572                        upload_file_to_presigned_url(
1573                            http.clone(),
1574                            &url_info.url,
1575                            local_path.clone(),
1576                        )
1577                        .await?;
1578                    }
1579                }
1580
1581                Ok(())
1582            }
1583        })
1584        .await
1585    }
1586
1587    pub async fn download(&self, url: &str) -> Result<Vec<u8>, Error> {
1588        // Uses default 120s timeout from client
1589        let resp = self.http.get(url).send().await?;
1590
1591        if !resp.status().is_success() {
1592            return Err(Error::HttpError(resp.error_for_status().unwrap_err()));
1593        }
1594
1595        let bytes = resp.bytes().await?;
1596        Ok(bytes.to_vec())
1597    }
1598
1599    /// Get the AnnotationGroup for the specified annotation set with the
1600    /// requested annotation types.  The annotation type is used to filter
1601    /// the annotations returned.  Images which do not have any annotations
1602    /// are included in the result.
1603    ///
1604    /// Get annotations as a DataFrame (2025.01 schema).
1605    ///
1606    /// **DEPRECATED**: Use [`Client::samples_dataframe()`] instead for full
1607    /// 2025.10 schema support including optional metadata columns.
1608    ///
1609    /// The result is a DataFrame following the EdgeFirst Dataset Format
1610    /// definition with 9 columns (original schema). Does not include new
1611    /// optional columns added in 2025.10.
1612    ///
1613    /// # Migration
1614    ///
1615    /// ```rust,no_run
1616    /// # use edgefirst_client::Client;
1617    /// # async fn example() -> Result<(), edgefirst_client::Error> {
1618    /// # let client = Client::new()?;
1619    /// # let dataset_id = 1.into();
1620    /// # let annotation_set_id = 1.into();
1621    /// # let groups = vec![];
1622    /// # let types = vec![];
1623    /// // OLD (deprecated):
1624    /// let df = client
1625    ///     .annotations_dataframe(annotation_set_id, &groups, &types, None)
1626    ///     .await?;
1627    ///
1628    /// // NEW (recommended):
1629    /// let df = client
1630    ///     .samples_dataframe(dataset_id, Some(annotation_set_id), &groups, &types, None)
1631    ///     .await?;
1632    /// # Ok(())
1633    /// # }
1634    /// ```
1635    ///
1636    /// To get the annotations as a vector of Annotation objects, use the
1637    /// `annotations` method instead.
1638    #[deprecated(
1639        since = "0.8.0",
1640        note = "Use `samples_dataframe()` for complete 2025.10 schema support"
1641    )]
1642    #[cfg(feature = "polars")]
1643    pub async fn annotations_dataframe(
1644        &self,
1645        annotation_set_id: AnnotationSetID,
1646        groups: &[String],
1647        types: &[AnnotationType],
1648        progress: Option<Sender<Progress>>,
1649    ) -> Result<DataFrame, Error> {
1650        use crate::dataset::annotations_dataframe;
1651
1652        let annotations = self
1653            .annotations(annotation_set_id, groups, types, progress)
1654            .await?;
1655        #[allow(deprecated)]
1656        annotations_dataframe(&annotations)
1657    }
1658
1659    /// Get samples as a DataFrame with complete 2025.10 schema.
1660    ///
1661    /// This is the recommended method for obtaining dataset annotations in
1662    /// DataFrame format. It includes all sample metadata (size, location,
1663    /// pose, degradation) as optional columns.
1664    ///
1665    /// # Arguments
1666    ///
1667    /// * `dataset_id` - Dataset identifier
1668    /// * `annotation_set_id` - Optional annotation set filter
1669    /// * `groups` - Dataset groups to include (train, val, test)
1670    /// * `types` - Annotation types to filter (bbox, box3d, mask)
1671    /// * `progress` - Optional progress callback
1672    ///
1673    /// # Example
1674    ///
1675    /// ```rust,no_run
1676    /// use edgefirst_client::Client;
1677    ///
1678    /// # async fn example() -> Result<(), edgefirst_client::Error> {
1679    /// # let client = Client::new()?;
1680    /// # let dataset_id = 1.into();
1681    /// # let annotation_set_id = 1.into();
1682    /// let df = client
1683    ///     .samples_dataframe(
1684    ///         dataset_id,
1685    ///         Some(annotation_set_id),
1686    ///         &["train".to_string()],
1687    ///         &[],
1688    ///         None,
1689    ///     )
1690    ///     .await?;
1691    /// println!("DataFrame shape: {:?}", df.shape());
1692    /// # Ok(())
1693    /// # }
1694    /// ```
1695    #[cfg(feature = "polars")]
1696    pub async fn samples_dataframe(
1697        &self,
1698        dataset_id: DatasetID,
1699        annotation_set_id: Option<AnnotationSetID>,
1700        groups: &[String],
1701        types: &[AnnotationType],
1702        progress: Option<Sender<Progress>>,
1703    ) -> Result<DataFrame, Error> {
1704        use crate::dataset::samples_dataframe;
1705
1706        let samples = self
1707            .samples(dataset_id, annotation_set_id, types, groups, &[], progress)
1708            .await?;
1709        samples_dataframe(&samples)
1710    }
1711
1712    /// List available snapshots.  If a name is provided, only snapshots
1713    /// containing that name are returned.
1714    pub async fn snapshots(&self, name: Option<&str>) -> Result<Vec<Snapshot>, Error> {
1715        let snapshots: Vec<Snapshot> = self
1716            .rpc::<(), Vec<Snapshot>>("snapshots.list".to_owned(), None)
1717            .await?;
1718        if let Some(name) = name {
1719            Ok(snapshots
1720                .into_iter()
1721                .filter(|s| s.description().contains(name))
1722                .collect())
1723        } else {
1724            Ok(snapshots)
1725        }
1726    }
1727
1728    /// Get the snapshot with the specified id.
1729    pub async fn snapshot(&self, snapshot_id: SnapshotID) -> Result<Snapshot, Error> {
1730        let params = HashMap::from([("snapshot_id", snapshot_id)]);
1731        self.rpc("snapshots.get".to_owned(), Some(params)).await
1732    }
1733
1734    /// Create a new snapshot from an MCAP file or EdgeFirst Dataset directory.
1735    ///
1736    /// Snapshots are frozen datasets in EdgeFirst Dataset Format (Zip/Arrow
1737    /// pairs) that serve two primary purposes:
1738    ///
1739    /// 1. **MCAP uploads**: Upload MCAP files containing sensor data (images,
1740    ///    point clouds, IMU, GPS) to EdgeFirst Studio. Snapshots can then be
1741    ///    restored with AGTG (Automatic Ground Truth Generation) and optional
1742    ///    auto-depth processing.
1743    ///
1744    /// 2. **Dataset exchange**: Export datasets for backup, sharing, or
1745    ///    migration between EdgeFirst Studio instances using the create →
1746    ///    download → upload → restore workflow.
1747    ///
1748    /// Large files are automatically chunked into 100MB parts and uploaded
1749    /// concurrently using S3 multipart upload with presigned URLs. Each chunk
1750    /// is streamed without loading into memory, maintaining constant memory
1751    /// usage.
1752    ///
1753    /// **Concurrency tuning**: Set `MAX_TASKS` to control concurrent
1754    /// uploads (default: half of CPU cores, min 2, max 8). Lower values work
1755    /// better for large files to avoid timeout issues. Higher values (16-32)
1756    /// are better for many small files.
1757    ///
1758    /// # Arguments
1759    ///
1760    /// * `path` - Local file path to MCAP file or directory containing
1761    ///   EdgeFirst Dataset Format files (Zip/Arrow pairs)
1762    /// * `progress` - Optional channel to receive upload progress updates
1763    ///
1764    /// # Returns
1765    ///
1766    /// Returns a `Snapshot` object with ID, description, status, path, and
1767    /// creation timestamp on success.
1768    ///
1769    /// # Errors
1770    ///
1771    /// Returns an error if:
1772    /// * Path doesn't exist or contains invalid UTF-8
1773    /// * File format is invalid (not MCAP or EdgeFirst Dataset Format)
1774    /// * Upload fails or network error occurs
1775    /// * Server rejects the snapshot
1776    ///
1777    /// # Example
1778    ///
1779    /// ```no_run
1780    /// # use edgefirst_client::{Client, Progress};
1781    /// # use tokio::sync::mpsc;
1782    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1783    /// let client = Client::new()?.with_token_path(None)?;
1784    ///
1785    /// // Upload MCAP file with progress tracking
1786    /// let (tx, mut rx) = mpsc::channel(1);
1787    /// tokio::spawn(async move {
1788    ///     while let Some(Progress { current, total }) = rx.recv().await {
1789    ///         println!(
1790    ///             "Upload: {}/{} bytes ({:.1}%)",
1791    ///             current,
1792    ///             total,
1793    ///             (current as f64 / total as f64) * 100.0
1794    ///         );
1795    ///     }
1796    /// });
1797    /// let snapshot = client.create_snapshot("data.mcap", Some(tx)).await?;
1798    /// println!("Created snapshot: {:?}", snapshot.id());
1799    ///
1800    /// // Upload dataset directory (no progress)
1801    /// let snapshot = client.create_snapshot("./dataset_export/", None).await?;
1802    /// # Ok(())
1803    /// # }
1804    /// ```
1805    ///
1806    /// # See Also
1807    ///
1808    /// * [`restore_snapshot`](Self::restore_snapshot) - Restore snapshot to
1809    ///   dataset
1810    /// * [`download_snapshot`](Self::download_snapshot) - Download snapshot
1811    ///   data
1812    /// * [`delete_snapshot`](Self::delete_snapshot) - Delete snapshot
1813    /// * [AGTG Documentation](https://doc.edgefirst.ai/latest/datasets/tutorials/annotations/automatic/)
1814    /// * [Snapshots Guide](https://doc.edgefirst.ai/latest/studio/snapshots/)
1815    pub async fn create_snapshot(
1816        &self,
1817        path: &str,
1818        progress: Option<Sender<Progress>>,
1819    ) -> Result<Snapshot, Error> {
1820        let path = Path::new(path);
1821
1822        if path.is_dir() {
1823            let path_str = path.to_str().ok_or_else(|| {
1824                Error::IoError(std::io::Error::new(
1825                    std::io::ErrorKind::InvalidInput,
1826                    "Path contains invalid UTF-8",
1827                ))
1828            })?;
1829            return self.create_snapshot_folder(path_str, progress).await;
1830        }
1831
1832        let name = path.file_name().and_then(|n| n.to_str()).ok_or_else(|| {
1833            Error::IoError(std::io::Error::new(
1834                std::io::ErrorKind::InvalidInput,
1835                "Invalid filename",
1836            ))
1837        })?;
1838        let total = path.metadata()?.len() as usize;
1839        let current = Arc::new(AtomicUsize::new(0));
1840
1841        if let Some(progress) = &progress {
1842            let _ = progress.send(Progress { current: 0, total }).await;
1843        }
1844
1845        let params = SnapshotCreateMultipartParams {
1846            snapshot_name: name.to_owned(),
1847            keys: vec![name.to_owned()],
1848            file_sizes: vec![total],
1849        };
1850        let multipart: HashMap<String, SnapshotCreateMultipartResultField> = self
1851            .rpc(
1852                "snapshots.create_upload_url_multipart".to_owned(),
1853                Some(params),
1854            )
1855            .await?;
1856
1857        let snapshot_id = match multipart.get("snapshot_id") {
1858            Some(SnapshotCreateMultipartResultField::Id(id)) => SnapshotID::from(*id),
1859            _ => return Err(Error::InvalidResponse),
1860        };
1861
1862        let snapshot = self.snapshot(snapshot_id).await?;
1863        let part_prefix = snapshot
1864            .path()
1865            .split("::/")
1866            .last()
1867            .ok_or(Error::InvalidResponse)?
1868            .to_owned();
1869        let part_key = format!("{}/{}", part_prefix, name);
1870        let mut part = match multipart.get(&part_key) {
1871            Some(SnapshotCreateMultipartResultField::Part(part)) => part,
1872            _ => return Err(Error::InvalidResponse),
1873        }
1874        .clone();
1875        part.key = Some(part_key);
1876
1877        let params = upload_multipart(
1878            self.http.clone(),
1879            part.clone(),
1880            path.to_path_buf(),
1881            total,
1882            current,
1883            progress.clone(),
1884        )
1885        .await?;
1886
1887        let complete: String = self
1888            .rpc(
1889                "snapshots.complete_multipart_upload".to_owned(),
1890                Some(params),
1891            )
1892            .await?;
1893        debug!("Snapshot Multipart Complete: {:?}", complete);
1894
1895        let params: SnapshotStatusParams = SnapshotStatusParams {
1896            snapshot_id,
1897            status: "available".to_owned(),
1898        };
1899        let _: SnapshotStatusResult = self
1900            .rpc("snapshots.update".to_owned(), Some(params))
1901            .await?;
1902
1903        if let Some(progress) = progress {
1904            drop(progress);
1905        }
1906
1907        self.snapshot(snapshot_id).await
1908    }
1909
1910    async fn create_snapshot_folder(
1911        &self,
1912        path: &str,
1913        progress: Option<Sender<Progress>>,
1914    ) -> Result<Snapshot, Error> {
1915        let path = Path::new(path);
1916        let name = path.file_name().and_then(|n| n.to_str()).ok_or_else(|| {
1917            Error::IoError(std::io::Error::new(
1918                std::io::ErrorKind::InvalidInput,
1919                "Invalid directory name",
1920            ))
1921        })?;
1922
1923        let files = WalkDir::new(path)
1924            .into_iter()
1925            .filter_map(|entry| entry.ok())
1926            .filter(|entry| entry.file_type().is_file())
1927            .filter_map(|entry| entry.path().strip_prefix(path).ok().map(|p| p.to_owned()))
1928            .collect::<Vec<_>>();
1929
1930        let total: usize = files
1931            .iter()
1932            .filter_map(|file| path.join(file).metadata().ok())
1933            .map(|metadata| metadata.len() as usize)
1934            .sum();
1935        let current = Arc::new(AtomicUsize::new(0));
1936
1937        if let Some(progress) = &progress {
1938            let _ = progress.send(Progress { current: 0, total }).await;
1939        }
1940
1941        let keys = files
1942            .iter()
1943            .filter_map(|key| key.to_str().map(|s| s.to_owned()))
1944            .collect::<Vec<_>>();
1945        let file_sizes = files
1946            .iter()
1947            .filter_map(|key| path.join(key).metadata().ok())
1948            .map(|metadata| metadata.len() as usize)
1949            .collect::<Vec<_>>();
1950
1951        let params = SnapshotCreateMultipartParams {
1952            snapshot_name: name.to_owned(),
1953            keys,
1954            file_sizes,
1955        };
1956
1957        let multipart: HashMap<String, SnapshotCreateMultipartResultField> = self
1958            .rpc(
1959                "snapshots.create_upload_url_multipart".to_owned(),
1960                Some(params),
1961            )
1962            .await?;
1963
1964        let snapshot_id = match multipart.get("snapshot_id") {
1965            Some(SnapshotCreateMultipartResultField::Id(id)) => SnapshotID::from(*id),
1966            _ => return Err(Error::InvalidResponse),
1967        };
1968
1969        let snapshot = self.snapshot(snapshot_id).await?;
1970        let part_prefix = snapshot
1971            .path()
1972            .split("::/")
1973            .last()
1974            .ok_or(Error::InvalidResponse)?
1975            .to_owned();
1976
1977        for file in files {
1978            let file_str = file.to_str().ok_or_else(|| {
1979                Error::IoError(std::io::Error::new(
1980                    std::io::ErrorKind::InvalidInput,
1981                    "File path contains invalid UTF-8",
1982                ))
1983            })?;
1984            let part_key = format!("{}/{}", part_prefix, file_str);
1985            let mut part = match multipart.get(&part_key) {
1986                Some(SnapshotCreateMultipartResultField::Part(part)) => part,
1987                _ => return Err(Error::InvalidResponse),
1988            }
1989            .clone();
1990            part.key = Some(part_key);
1991
1992            let params = upload_multipart(
1993                self.http.clone(),
1994                part.clone(),
1995                path.join(file),
1996                total,
1997                current.clone(),
1998                progress.clone(),
1999            )
2000            .await?;
2001
2002            let complete: String = self
2003                .rpc(
2004                    "snapshots.complete_multipart_upload".to_owned(),
2005                    Some(params),
2006                )
2007                .await?;
2008            debug!("Snapshot Part Complete: {:?}", complete);
2009        }
2010
2011        let params = SnapshotStatusParams {
2012            snapshot_id,
2013            status: "available".to_owned(),
2014        };
2015        let _: SnapshotStatusResult = self
2016            .rpc("snapshots.update".to_owned(), Some(params))
2017            .await?;
2018
2019        if let Some(progress) = progress {
2020            drop(progress);
2021        }
2022
2023        self.snapshot(snapshot_id).await
2024    }
2025
2026    /// Delete a snapshot from EdgeFirst Studio.
2027    ///
2028    /// Permanently removes a snapshot and its associated data. This operation
2029    /// cannot be undone.
2030    ///
2031    /// # Arguments
2032    ///
2033    /// * `snapshot_id` - The snapshot ID to delete
2034    ///
2035    /// # Errors
2036    ///
2037    /// Returns an error if:
2038    /// * Snapshot doesn't exist
2039    /// * User lacks permission to delete the snapshot
2040    /// * Server error occurs
2041    ///
2042    /// # Example
2043    ///
2044    /// ```no_run
2045    /// # use edgefirst_client::{Client, SnapshotID};
2046    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2047    /// let client = Client::new()?.with_token_path(None)?;
2048    /// let snapshot_id = SnapshotID::from(123);
2049    /// client.delete_snapshot(snapshot_id).await?;
2050    /// # Ok(())
2051    /// # }
2052    /// ```
2053    ///
2054    /// # See Also
2055    ///
2056    /// * [`create_snapshot`](Self::create_snapshot) - Upload snapshot
2057    /// * [`snapshots`](Self::snapshots) - List all snapshots
2058    pub async fn delete_snapshot(&self, snapshot_id: SnapshotID) -> Result<(), Error> {
2059        let params = HashMap::from([("snapshot_id", snapshot_id)]);
2060        let _: String = self
2061            .rpc("snapshots.delete".to_owned(), Some(params))
2062            .await?;
2063        Ok(())
2064    }
2065
2066    /// Download a snapshot from EdgeFirst Studio to local storage.
2067    ///
2068    /// Downloads all files in a snapshot (single MCAP file or directory of
2069    /// EdgeFirst Dataset Format files) to the specified output path. Files are
2070    /// downloaded concurrently with progress tracking.
2071    ///
2072    /// **Concurrency tuning**: Set `MAX_TASKS` to control concurrent
2073    /// downloads (default: half of CPU cores, min 2, max 8).
2074    ///
2075    /// # Arguments
2076    ///
2077    /// * `snapshot_id` - The snapshot ID to download
2078    /// * `output` - Local directory path to save downloaded files
2079    /// * `progress` - Optional channel to receive download progress updates
2080    ///
2081    /// # Errors
2082    ///
2083    /// Returns an error if:
2084    /// * Snapshot doesn't exist
2085    /// * Output directory cannot be created
2086    /// * Download fails or network error occurs
2087    ///
2088    /// # Example
2089    ///
2090    /// ```no_run
2091    /// # use edgefirst_client::{Client, SnapshotID, Progress};
2092    /// # use tokio::sync::mpsc;
2093    /// # use std::path::PathBuf;
2094    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2095    /// let client = Client::new()?.with_token_path(None)?;
2096    /// let snapshot_id = SnapshotID::from(123);
2097    ///
2098    /// // Download with progress tracking
2099    /// let (tx, mut rx) = mpsc::channel(1);
2100    /// tokio::spawn(async move {
2101    ///     while let Some(Progress { current, total }) = rx.recv().await {
2102    ///         println!("Download: {}/{} bytes", current, total);
2103    ///     }
2104    /// });
2105    /// client
2106    ///     .download_snapshot(snapshot_id, PathBuf::from("./output"), Some(tx))
2107    ///     .await?;
2108    /// # Ok(())
2109    /// # }
2110    /// ```
2111    ///
2112    /// # See Also
2113    ///
2114    /// * [`create_snapshot`](Self::create_snapshot) - Upload snapshot
2115    /// * [`restore_snapshot`](Self::restore_snapshot) - Restore snapshot to
2116    ///   dataset
2117    /// * [`delete_snapshot`](Self::delete_snapshot) - Delete snapshot
2118    pub async fn download_snapshot(
2119        &self,
2120        snapshot_id: SnapshotID,
2121        output: PathBuf,
2122        progress: Option<Sender<Progress>>,
2123    ) -> Result<(), Error> {
2124        fs::create_dir_all(&output).await?;
2125
2126        let params = HashMap::from([("snapshot_id", snapshot_id)]);
2127        let items: HashMap<String, String> = self
2128            .rpc("snapshots.create_download_url".to_owned(), Some(params))
2129            .await?;
2130
2131        let total = Arc::new(AtomicUsize::new(0));
2132        let current = Arc::new(AtomicUsize::new(0));
2133        let sem = Arc::new(Semaphore::new(max_tasks()));
2134
2135        let tasks = items
2136            .iter()
2137            .map(|(key, url)| {
2138                let http = self.http.clone();
2139                let key = key.clone();
2140                let url = url.clone();
2141                let output = output.clone();
2142                let progress = progress.clone();
2143                let current = current.clone();
2144                let total = total.clone();
2145                let sem = sem.clone();
2146
2147                tokio::spawn(async move {
2148                    let _permit = sem.acquire().await.map_err(|_| {
2149                        Error::IoError(std::io::Error::other("Semaphore closed unexpectedly"))
2150                    })?;
2151                    let res = http.get(url).send().await?;
2152                    let content_length = res.content_length().unwrap_or(0) as usize;
2153
2154                    if let Some(progress) = &progress {
2155                        let total = total.fetch_add(content_length, Ordering::SeqCst);
2156                        let _ = progress
2157                            .send(Progress {
2158                                current: current.load(Ordering::SeqCst),
2159                                total: total + content_length,
2160                            })
2161                            .await;
2162                    }
2163
2164                    let mut file = File::create(output.join(key)).await?;
2165                    let mut stream = res.bytes_stream();
2166
2167                    while let Some(chunk) = stream.next().await {
2168                        let chunk = chunk?;
2169                        file.write_all(&chunk).await?;
2170                        let len = chunk.len();
2171
2172                        if let Some(progress) = &progress {
2173                            let total = total.load(Ordering::SeqCst);
2174                            let current = current.fetch_add(len, Ordering::SeqCst);
2175
2176                            let _ = progress
2177                                .send(Progress {
2178                                    current: current + len,
2179                                    total,
2180                                })
2181                                .await;
2182                        }
2183                    }
2184
2185                    Ok::<(), Error>(())
2186                })
2187            })
2188            .collect::<Vec<_>>();
2189
2190        join_all(tasks)
2191            .await
2192            .into_iter()
2193            .collect::<Result<Vec<_>, _>>()?
2194            .into_iter()
2195            .collect::<Result<Vec<_>, _>>()?;
2196
2197        Ok(())
2198    }
2199
2200    /// Restore a snapshot to a dataset in EdgeFirst Studio with optional AGTG.
2201    ///
2202    /// Restores a snapshot (MCAP file or EdgeFirst Dataset) into a dataset in
2203    /// the specified project. For MCAP files, supports:
2204    ///
2205    /// * **AGTG (Automatic Ground Truth Generation)**: Automatically annotate
2206    ///   detected objects with 2D masks/boxes and 3D boxes (if radar/LiDAR
2207    ///   present)
2208    /// * **Auto-depth**: Generate depthmaps (Maivin/Raivin cameras only)
2209    /// * **Topic filtering**: Select specific MCAP topics to restore
2210    ///
2211    /// For EdgeFirst Dataset snapshots, this simply imports the pre-existing
2212    /// dataset structure.
2213    ///
2214    /// # Arguments
2215    ///
2216    /// * `project_id` - Target project ID
2217    /// * `snapshot_id` - Snapshot ID to restore
2218    /// * `topics` - MCAP topics to include (empty = all topics)
2219    /// * `autolabel` - Object labels for AGTG (empty = no auto-annotation)
2220    /// * `autodepth` - Generate depthmaps (Maivin/Raivin only)
2221    /// * `dataset_name` - Optional custom dataset name
2222    /// * `dataset_description` - Optional dataset description
2223    ///
2224    /// # Returns
2225    ///
2226    /// Returns a `SnapshotRestoreResult` with the new dataset ID and status.
2227    ///
2228    /// # Errors
2229    ///
2230    /// Returns an error if:
2231    /// * Snapshot or project doesn't exist
2232    /// * Snapshot format is invalid
2233    /// * Server rejects restoration parameters
2234    ///
2235    /// # Example
2236    ///
2237    /// ```no_run
2238    /// # use edgefirst_client::{Client, ProjectID, SnapshotID};
2239    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2240    /// let client = Client::new()?.with_token_path(None)?;
2241    /// let project_id = ProjectID::from(1);
2242    /// let snapshot_id = SnapshotID::from(123);
2243    ///
2244    /// // Restore MCAP with AGTG for "person" and "car" detection
2245    /// let result = client
2246    ///     .restore_snapshot(
2247    ///         project_id,
2248    ///         snapshot_id,
2249    ///         &[],                                        // All topics
2250    ///         &["person".to_string(), "car".to_string()], // AGTG labels
2251    ///         true,                                       // Auto-depth
2252    ///         Some("Highway Dataset"),
2253    ///         Some("Collected on I-95"),
2254    ///     )
2255    ///     .await?;
2256    /// println!("Restored to dataset: {:?}", result.dataset_id);
2257    /// # Ok(())
2258    /// # }
2259    /// ```
2260    ///
2261    /// # See Also
2262    ///
2263    /// * [`create_snapshot`](Self::create_snapshot) - Upload snapshot
2264    /// * [`download_snapshot`](Self::download_snapshot) - Download snapshot
2265    /// * [AGTG Documentation](https://doc.edgefirst.ai/latest/datasets/tutorials/annotations/automatic/)
2266    #[allow(clippy::too_many_arguments)]
2267    pub async fn restore_snapshot(
2268        &self,
2269        project_id: ProjectID,
2270        snapshot_id: SnapshotID,
2271        topics: &[String],
2272        autolabel: &[String],
2273        autodepth: bool,
2274        dataset_name: Option<&str>,
2275        dataset_description: Option<&str>,
2276    ) -> Result<SnapshotRestoreResult, Error> {
2277        let params = SnapshotRestore {
2278            project_id,
2279            snapshot_id,
2280            fps: 1,
2281            autodepth,
2282            agtg_pipeline: !autolabel.is_empty(),
2283            autolabel: autolabel.to_vec(),
2284            topics: topics.to_vec(),
2285            dataset_name: dataset_name.map(|s| s.to_owned()),
2286            dataset_description: dataset_description.map(|s| s.to_owned()),
2287        };
2288        self.rpc("snapshots.restore".to_owned(), Some(params)).await
2289    }
2290
2291    /// Returns a list of experiments available to the user.  The experiments
2292    /// are returned as a vector of Experiment objects.  If name is provided
2293    /// then only experiments containing this string are returned.
2294    ///
2295    /// Experiments provide a method of organizing training and validation
2296    /// sessions together and are akin to an Experiment in MLFlow terminology.  
2297    /// Each experiment can have multiple trainer sessions associated with it,
2298    /// these would be akin to runs in MLFlow terminology.
2299    pub async fn experiments(
2300        &self,
2301        project_id: ProjectID,
2302        name: Option<&str>,
2303    ) -> Result<Vec<Experiment>, Error> {
2304        let params = HashMap::from([("project_id", project_id)]);
2305        let experiments: Vec<Experiment> =
2306            self.rpc("trainer.list2".to_owned(), Some(params)).await?;
2307        if let Some(name) = name {
2308            Ok(experiments
2309                .into_iter()
2310                .filter(|e| e.name().contains(name))
2311                .collect())
2312        } else {
2313            Ok(experiments)
2314        }
2315    }
2316
2317    /// Return the experiment with the specified experiment ID.  If the
2318    /// experiment does not exist, an error is returned.
2319    pub async fn experiment(&self, experiment_id: ExperimentID) -> Result<Experiment, Error> {
2320        let params = HashMap::from([("trainer_id", experiment_id)]);
2321        self.rpc("trainer.get".to_owned(), Some(params)).await
2322    }
2323
2324    /// Returns a list of trainer sessions available to the user.  The trainer
2325    /// sessions are returned as a vector of TrainingSession objects.  If name
2326    /// is provided then only trainer sessions containing this string are
2327    /// returned.
2328    ///
2329    /// Trainer sessions are akin to runs in MLFlow terminology.  These
2330    /// represent an actual training session which will produce metrics and
2331    /// model artifacts.
2332    pub async fn training_sessions(
2333        &self,
2334        experiment_id: ExperimentID,
2335        name: Option<&str>,
2336    ) -> Result<Vec<TrainingSession>, Error> {
2337        let params = HashMap::from([("trainer_id", experiment_id)]);
2338        let sessions: Vec<TrainingSession> = self
2339            .rpc("trainer.session.list".to_owned(), Some(params))
2340            .await?;
2341        if let Some(name) = name {
2342            Ok(sessions
2343                .into_iter()
2344                .filter(|s| s.name().contains(name))
2345                .collect())
2346        } else {
2347            Ok(sessions)
2348        }
2349    }
2350
2351    /// Return the trainer session with the specified trainer session ID.  If
2352    /// the trainer session does not exist, an error is returned.
2353    pub async fn training_session(
2354        &self,
2355        session_id: TrainingSessionID,
2356    ) -> Result<TrainingSession, Error> {
2357        let params = HashMap::from([("trainer_session_id", session_id)]);
2358        self.rpc("trainer.session.get".to_owned(), Some(params))
2359            .await
2360    }
2361
2362    /// List validation sessions for the given project.
2363    pub async fn validation_sessions(
2364        &self,
2365        project_id: ProjectID,
2366    ) -> Result<Vec<ValidationSession>, Error> {
2367        let params = HashMap::from([("project_id", project_id)]);
2368        self.rpc("validate.session.list".to_owned(), Some(params))
2369            .await
2370    }
2371
2372    /// Retrieve a specific validation session.
2373    pub async fn validation_session(
2374        &self,
2375        session_id: ValidationSessionID,
2376    ) -> Result<ValidationSession, Error> {
2377        let params = HashMap::from([("validate_session_id", session_id)]);
2378        self.rpc("validate.session.get".to_owned(), Some(params))
2379            .await
2380    }
2381
2382    /// List the artifacts for the specified trainer session.  The artifacts
2383    /// are returned as a vector of strings.
2384    pub async fn artifacts(
2385        &self,
2386        training_session_id: TrainingSessionID,
2387    ) -> Result<Vec<Artifact>, Error> {
2388        let params = HashMap::from([("training_session_id", training_session_id)]);
2389        self.rpc("trainer.get_artifacts".to_owned(), Some(params))
2390            .await
2391    }
2392
2393    /// Download the model artifact for the specified trainer session to the
2394    /// specified file path, if path is not provided it will be downloaded to
2395    /// the current directory with the same filename.  A progress callback can
2396    /// be provided to monitor the progress of the download over a watch
2397    /// channel.
2398    pub async fn download_artifact(
2399        &self,
2400        training_session_id: TrainingSessionID,
2401        modelname: &str,
2402        filename: Option<PathBuf>,
2403        progress: Option<Sender<Progress>>,
2404    ) -> Result<(), Error> {
2405        let filename = filename.unwrap_or_else(|| PathBuf::from(modelname));
2406        let resp = self
2407            .http
2408            .get(format!(
2409                "{}/download_model?training_session_id={}&file={}",
2410                self.url,
2411                training_session_id.value(),
2412                modelname
2413            ))
2414            .header("Authorization", format!("Bearer {}", self.token().await))
2415            .send()
2416            .await?;
2417        if !resp.status().is_success() {
2418            let err = resp.error_for_status_ref().unwrap_err();
2419            return Err(Error::HttpError(err));
2420        }
2421
2422        if let Some(parent) = filename.parent() {
2423            fs::create_dir_all(parent).await?;
2424        }
2425
2426        if let Some(progress) = progress {
2427            let total = resp.content_length().unwrap_or(0) as usize;
2428            let _ = progress.send(Progress { current: 0, total }).await;
2429
2430            let mut file = File::create(filename).await?;
2431            let mut current = 0;
2432            let mut stream = resp.bytes_stream();
2433
2434            while let Some(item) = stream.next().await {
2435                let chunk = item?;
2436                file.write_all(&chunk).await?;
2437                current += chunk.len();
2438                let _ = progress.send(Progress { current, total }).await;
2439            }
2440        } else {
2441            let body = resp.bytes().await?;
2442            fs::write(filename, body).await?;
2443        }
2444
2445        Ok(())
2446    }
2447
2448    /// Download the model checkpoint associated with the specified trainer
2449    /// session to the specified file path, if path is not provided it will be
2450    /// downloaded to the current directory with the same filename.  A progress
2451    /// callback can be provided to monitor the progress of the download over a
2452    /// watch channel.
2453    ///
2454    /// There is no API for listing checkpoints it is expected that trainers are
2455    /// aware of possible checkpoints and their names within the checkpoint
2456    /// folder on the server.
2457    pub async fn download_checkpoint(
2458        &self,
2459        training_session_id: TrainingSessionID,
2460        checkpoint: &str,
2461        filename: Option<PathBuf>,
2462        progress: Option<Sender<Progress>>,
2463    ) -> Result<(), Error> {
2464        let filename = filename.unwrap_or_else(|| PathBuf::from(checkpoint));
2465        let resp = self
2466            .http
2467            .get(format!(
2468                "{}/download_checkpoint?folder=checkpoints&training_session_id={}&file={}",
2469                self.url,
2470                training_session_id.value(),
2471                checkpoint
2472            ))
2473            .header("Authorization", format!("Bearer {}", self.token().await))
2474            .send()
2475            .await?;
2476        if !resp.status().is_success() {
2477            let err = resp.error_for_status_ref().unwrap_err();
2478            return Err(Error::HttpError(err));
2479        }
2480
2481        if let Some(parent) = filename.parent() {
2482            fs::create_dir_all(parent).await?;
2483        }
2484
2485        if let Some(progress) = progress {
2486            let total = resp.content_length().unwrap_or(0) as usize;
2487            let _ = progress.send(Progress { current: 0, total }).await;
2488
2489            let mut file = File::create(filename).await?;
2490            let mut current = 0;
2491            let mut stream = resp.bytes_stream();
2492
2493            while let Some(item) = stream.next().await {
2494                let chunk = item?;
2495                file.write_all(&chunk).await?;
2496                current += chunk.len();
2497                let _ = progress.send(Progress { current, total }).await;
2498            }
2499        } else {
2500            let body = resp.bytes().await?;
2501            fs::write(filename, body).await?;
2502        }
2503
2504        Ok(())
2505    }
2506
2507    /// Return a list of tasks for the current user.
2508    ///
2509    /// # Arguments
2510    ///
2511    /// * `name` - Optional filter for task name (client-side substring match)
2512    /// * `workflow` - Optional filter for workflow/task type. If provided,
2513    ///   filters server-side by exact match. Valid values include: "trainer",
2514    ///   "validation", "snapshot-create", "snapshot-restore", "copyds",
2515    ///   "upload", "auto-ann", "auto-seg", "aigt", "import", "export",
2516    ///   "convertor", "twostage"
2517    /// * `status` - Optional filter for task status (e.g., "running",
2518    ///   "complete", "error")
2519    /// * `manager` - Optional filter for task manager type (e.g., "aws",
2520    ///   "user", "kubernetes")
2521    pub async fn tasks(
2522        &self,
2523        name: Option<&str>,
2524        workflow: Option<&str>,
2525        status: Option<&str>,
2526        manager: Option<&str>,
2527    ) -> Result<Vec<Task>, Error> {
2528        let mut params = TasksListParams {
2529            continue_token: None,
2530            types: workflow.map(|w| vec![w.to_owned()]),
2531            status: status.map(|s| vec![s.to_owned()]),
2532            manager: manager.map(|m| vec![m.to_owned()]),
2533        };
2534        let mut tasks = Vec::new();
2535
2536        loop {
2537            let result = self
2538                .rpc::<_, TasksListResult>("task.list".to_owned(), Some(&params))
2539                .await?;
2540            tasks.extend(result.tasks);
2541
2542            if result.continue_token.is_none() || result.continue_token == Some("".into()) {
2543                params.continue_token = None;
2544            } else {
2545                params.continue_token = result.continue_token;
2546            }
2547
2548            if params.continue_token.is_none() {
2549                break;
2550            }
2551        }
2552
2553        if let Some(name) = name {
2554            tasks.retain(|t| t.name().contains(name));
2555        }
2556
2557        Ok(tasks)
2558    }
2559
2560    /// Retrieve the task information and status.
2561    pub async fn task_info(&self, task_id: TaskID) -> Result<TaskInfo, Error> {
2562        self.rpc(
2563            "task.get".to_owned(),
2564            Some(HashMap::from([("id", task_id)])),
2565        )
2566        .await
2567    }
2568
2569    /// Updates the tasks status.
2570    pub async fn task_status(&self, task_id: TaskID, status: &str) -> Result<Task, Error> {
2571        let status = TaskStatus {
2572            task_id,
2573            status: status.to_owned(),
2574        };
2575        self.rpc("docker.update.status".to_owned(), Some(status))
2576            .await
2577    }
2578
2579    /// Defines the stages for the task.  The stages are defined as a mapping
2580    /// from stage names to their descriptions.  Once stages are defined their
2581    /// status can be updated using the update_stage method.
2582    pub async fn set_stages(&self, task_id: TaskID, stages: &[(&str, &str)]) -> Result<(), Error> {
2583        let stages: Vec<HashMap<String, String>> = stages
2584            .iter()
2585            .map(|(key, value)| {
2586                let mut stage_map = HashMap::new();
2587                stage_map.insert(key.to_string(), value.to_string());
2588                stage_map
2589            })
2590            .collect();
2591        let params = TaskStages { task_id, stages };
2592        let _: Task = self.rpc("status.stages".to_owned(), Some(params)).await?;
2593        Ok(())
2594    }
2595
2596    /// Updates the progress of the task for the provided stage and status
2597    /// information.
2598    pub async fn update_stage(
2599        &self,
2600        task_id: TaskID,
2601        stage: &str,
2602        status: &str,
2603        message: &str,
2604        percentage: u8,
2605    ) -> Result<(), Error> {
2606        let stage = Stage::new(
2607            Some(task_id),
2608            stage.to_owned(),
2609            Some(status.to_owned()),
2610            Some(message.to_owned()),
2611            percentage,
2612        );
2613        let _: Task = self.rpc("status.update".to_owned(), Some(stage)).await?;
2614        Ok(())
2615    }
2616
2617    /// Raw fetch from the Studio server is used for downloading files.
2618    pub async fn fetch(&self, query: &str) -> Result<Vec<u8>, Error> {
2619        let req = self
2620            .http
2621            .get(format!("{}/{}", self.url, query))
2622            .header("User-Agent", "EdgeFirst Client")
2623            .header("Authorization", format!("Bearer {}", self.token().await));
2624        let resp = req.send().await?;
2625
2626        if resp.status().is_success() {
2627            let body = resp.bytes().await?;
2628
2629            if log_enabled!(Level::Trace) {
2630                trace!("Fetch Response: {}", String::from_utf8_lossy(&body));
2631            }
2632
2633            Ok(body.to_vec())
2634        } else {
2635            let err = resp.error_for_status_ref().unwrap_err();
2636            Err(Error::HttpError(err))
2637        }
2638    }
2639
2640    /// Sends a multipart post request to the server.  This is used by the
2641    /// upload and download APIs which do not use JSON-RPC but instead transfer
2642    /// files using multipart/form-data.
2643    pub async fn post_multipart(&self, method: &str, form: Form) -> Result<String, Error> {
2644        let req = self
2645            .http
2646            .post(format!("{}/api?method={}", self.url, method))
2647            .header("Accept", "application/json")
2648            .header("User-Agent", "EdgeFirst Client")
2649            .header("Authorization", format!("Bearer {}", self.token().await))
2650            .multipart(form);
2651        let resp = req.send().await?;
2652
2653        if resp.status().is_success() {
2654            let body = resp.bytes().await?;
2655
2656            if log_enabled!(Level::Trace) {
2657                trace!(
2658                    "POST Multipart Response: {}",
2659                    String::from_utf8_lossy(&body)
2660                );
2661            }
2662
2663            let response: RpcResponse<String> = match serde_json::from_slice(&body) {
2664                Ok(response) => response,
2665                Err(err) => {
2666                    error!("Invalid JSON Response: {}", String::from_utf8_lossy(&body));
2667                    return Err(err.into());
2668                }
2669            };
2670
2671            if let Some(error) = response.error {
2672                Err(Error::RpcError(error.code, error.message))
2673            } else if let Some(result) = response.result {
2674                Ok(result)
2675            } else {
2676                Err(Error::InvalidResponse)
2677            }
2678        } else {
2679            let err = resp.error_for_status_ref().unwrap_err();
2680            Err(Error::HttpError(err))
2681        }
2682    }
2683
2684    /// Send a JSON-RPC request to the server.  The method is the name of the
2685    /// method to call on the server.  The params are the parameters to pass to
2686    /// the method.  The method and params are serialized into a JSON-RPC
2687    /// request and sent to the server.  The response is deserialized into
2688    /// the specified type and returned to the caller.
2689    ///
2690    /// NOTE: This API would generally not be called directly and instead users
2691    /// should use the higher-level methods provided by the client.
2692    pub async fn rpc<Params, RpcResult>(
2693        &self,
2694        method: String,
2695        params: Option<Params>,
2696    ) -> Result<RpcResult, Error>
2697    where
2698        Params: Serialize,
2699        RpcResult: DeserializeOwned,
2700    {
2701        let auth_expires = self.token_expiration().await?;
2702        if auth_expires <= Utc::now() + Duration::from_secs(3600) {
2703            self.renew_token().await?;
2704        }
2705
2706        self.rpc_without_auth(method, params).await
2707    }
2708
2709    async fn rpc_without_auth<Params, RpcResult>(
2710        &self,
2711        method: String,
2712        params: Option<Params>,
2713    ) -> Result<RpcResult, Error>
2714    where
2715        Params: Serialize,
2716        RpcResult: DeserializeOwned,
2717    {
2718        let request = RpcRequest {
2719            method,
2720            params,
2721            ..Default::default()
2722        };
2723
2724        if log_enabled!(Level::Trace) {
2725            trace!(
2726                "RPC Request: {}",
2727                serde_json::ser::to_string_pretty(&request)?
2728            );
2729        }
2730
2731        let url = format!("{}/api", self.url);
2732
2733        // Use client-level timeout (allows retry mechanism to work properly)
2734        // Per-request timeout overrides can prevent retries from functioning
2735        let res = self
2736            .http
2737            .post(&url)
2738            .header("Accept", "application/json")
2739            .header("User-Agent", "EdgeFirst Client")
2740            .header("Authorization", format!("Bearer {}", self.token().await))
2741            .json(&request)
2742            .send()
2743            .await?;
2744
2745        self.process_rpc_response(res).await
2746    }
2747
2748    async fn process_rpc_response<RpcResult>(
2749        &self,
2750        res: reqwest::Response,
2751    ) -> Result<RpcResult, Error>
2752    where
2753        RpcResult: DeserializeOwned,
2754    {
2755        let body = res.bytes().await?;
2756
2757        if log_enabled!(Level::Trace) {
2758            trace!("RPC Response: {}", String::from_utf8_lossy(&body));
2759        }
2760
2761        let response: RpcResponse<RpcResult> = match serde_json::from_slice(&body) {
2762            Ok(response) => response,
2763            Err(err) => {
2764                error!("Invalid JSON Response: {}", String::from_utf8_lossy(&body));
2765                return Err(err.into());
2766            }
2767        };
2768
2769        // FIXME: Studio Server always returns 999 as the id.
2770        // if request.id.to_string() != response.id {
2771        //     return Err(Error::InvalidRpcId(response.id));
2772        // }
2773
2774        if let Some(error) = response.error {
2775            Err(Error::RpcError(error.code, error.message))
2776        } else if let Some(result) = response.result {
2777            Ok(result)
2778        } else {
2779            Err(Error::InvalidResponse)
2780        }
2781    }
2782}
2783
2784/// Process items in parallel with semaphore concurrency control and progress
2785/// tracking.
2786///
2787/// This helper eliminates boilerplate for parallel item processing with:
2788/// - Semaphore limiting concurrent tasks to `max_tasks()` (configurable via
2789///   `MAX_TASKS` environment variable, default: half of CPU cores, min 2, max
2790///   8)
2791/// - Atomic progress counter with automatic item-level updates
2792/// - Progress updates sent after each item completes (not byte-level streaming)
2793/// - Proper error propagation from spawned tasks
2794///
2795/// Note: This is optimized for discrete items with post-completion progress
2796/// updates. For byte-level streaming progress or custom retry logic, use
2797/// specialized implementations.
2798///
2799/// # Arguments
2800///
2801/// * `items` - Collection of items to process in parallel
2802/// * `progress` - Optional progress channel for tracking completion
2803/// * `work_fn` - Async function to execute for each item
2804///
2805/// # Examples
2806///
2807/// ```rust,ignore
2808/// parallel_foreach_items(samples, progress, |sample| async move {
2809///     // Process sample
2810///     sample.download(&client, file_type).await?;
2811///     Ok(())
2812/// }).await?;
2813/// ```
2814async fn parallel_foreach_items<T, F, Fut>(
2815    items: Vec<T>,
2816    progress: Option<Sender<Progress>>,
2817    work_fn: F,
2818) -> Result<(), Error>
2819where
2820    T: Send + 'static,
2821    F: Fn(T) -> Fut + Send + Sync + 'static,
2822    Fut: Future<Output = Result<(), Error>> + Send + 'static,
2823{
2824    let total = items.len();
2825    let current = Arc::new(AtomicUsize::new(0));
2826    let sem = Arc::new(Semaphore::new(max_tasks()));
2827    let work_fn = Arc::new(work_fn);
2828
2829    let tasks = items
2830        .into_iter()
2831        .map(|item| {
2832            let sem = sem.clone();
2833            let current = current.clone();
2834            let progress = progress.clone();
2835            let work_fn = work_fn.clone();
2836
2837            tokio::spawn(async move {
2838                let _permit = sem.acquire().await.map_err(|_| {
2839                    Error::IoError(std::io::Error::other("Semaphore closed unexpectedly"))
2840                })?;
2841
2842                // Execute the actual work
2843                work_fn(item).await?;
2844
2845                // Update progress
2846                if let Some(progress) = &progress {
2847                    let current = current.fetch_add(1, Ordering::SeqCst);
2848                    let _ = progress
2849                        .send(Progress {
2850                            current: current + 1,
2851                            total,
2852                        })
2853                        .await;
2854                }
2855
2856                Ok::<(), Error>(())
2857            })
2858        })
2859        .collect::<Vec<_>>();
2860
2861    join_all(tasks)
2862        .await
2863        .into_iter()
2864        .collect::<Result<Vec<_>, _>>()?
2865        .into_iter()
2866        .collect::<Result<Vec<_>, _>>()?;
2867
2868    if let Some(progress) = progress {
2869        drop(progress);
2870    }
2871
2872    Ok(())
2873}
2874
2875/// Upload a file to S3 using multipart upload with presigned URLs.
2876///
2877/// Splits a file into chunks (100MB each) and uploads them in parallel using
2878/// S3 multipart upload protocol. Returns completion parameters with ETags for
2879/// finalizing the upload.
2880///
2881/// This function handles:
2882/// - Splitting files into parts based on PART_SIZE (100MB)
2883/// - Parallel upload with concurrency limiting via `max_tasks()` (configurable
2884///   with `MAX_TASKS`, default: half of CPU cores, min 2, max 8)
2885/// - Retry logic (handled by reqwest client)
2886/// - Progress tracking across all parts
2887///
2888/// # Arguments
2889///
2890/// * `http` - HTTP client for making requests
2891/// * `part` - Snapshot part info with presigned URLs for each chunk
2892/// * `path` - Local file path to upload
2893/// * `total` - Total bytes across all files for progress calculation
2894/// * `current` - Atomic counter tracking bytes uploaded across all operations
2895/// * `progress` - Optional channel for sending progress updates
2896///
2897/// # Returns
2898///
2899/// Parameters needed to complete the multipart upload (key, upload_id, ETags)
2900async fn upload_multipart(
2901    http: reqwest::Client,
2902    part: SnapshotPart,
2903    path: PathBuf,
2904    total: usize,
2905    current: Arc<AtomicUsize>,
2906    progress: Option<Sender<Progress>>,
2907) -> Result<SnapshotCompleteMultipartParams, Error> {
2908    let filesize = path.metadata()?.len() as usize;
2909    let n_parts = filesize.div_ceil(PART_SIZE);
2910    let sem = Arc::new(Semaphore::new(max_tasks()));
2911
2912    let key = part.key.ok_or(Error::InvalidResponse)?;
2913    let upload_id = part.upload_id;
2914
2915    let urls = part.urls.clone();
2916    // Pre-allocate ETag slots for all parts
2917    let etags = Arc::new(tokio::sync::Mutex::new(vec![
2918        EtagPart {
2919            etag: "".to_owned(),
2920            part_number: 0,
2921        };
2922        n_parts
2923    ]));
2924
2925    // Upload all parts in parallel with concurrency limiting
2926    let tasks = (0..n_parts)
2927        .map(|part| {
2928            let http = http.clone();
2929            let url = urls[part].clone();
2930            let etags = etags.clone();
2931            let path = path.to_owned();
2932            let sem = sem.clone();
2933            let progress = progress.clone();
2934            let current = current.clone();
2935
2936            tokio::spawn(async move {
2937                // Acquire semaphore permit to limit concurrent uploads
2938                let _permit = sem.acquire().await?;
2939
2940                // Upload part (retry is handled by reqwest client)
2941                let etag =
2942                    upload_part(http.clone(), url.clone(), path.clone(), part, n_parts).await?;
2943
2944                // Store ETag for this part (needed to complete multipart upload)
2945                let mut etags = etags.lock().await;
2946                etags[part] = EtagPart {
2947                    etag,
2948                    part_number: part + 1,
2949                };
2950
2951                // Update progress counter
2952                let current = current.fetch_add(PART_SIZE, Ordering::SeqCst);
2953                if let Some(progress) = &progress {
2954                    let _ = progress
2955                        .send(Progress {
2956                            current: current + PART_SIZE,
2957                            total,
2958                        })
2959                        .await;
2960                }
2961
2962                Ok::<(), Error>(())
2963            })
2964        })
2965        .collect::<Vec<_>>();
2966
2967    // Wait for all parts to complete
2968    join_all(tasks)
2969        .await
2970        .into_iter()
2971        .collect::<Result<Vec<_>, _>>()?;
2972
2973    Ok(SnapshotCompleteMultipartParams {
2974        key,
2975        upload_id,
2976        etag_list: etags.lock().await.clone(),
2977    })
2978}
2979
2980async fn upload_part(
2981    http: reqwest::Client,
2982    url: String,
2983    path: PathBuf,
2984    part: usize,
2985    n_parts: usize,
2986) -> Result<String, Error> {
2987    let filesize = path.metadata()?.len() as usize;
2988    let mut file = File::open(path).await?;
2989    file.seek(SeekFrom::Start((part * PART_SIZE) as u64))
2990        .await?;
2991    let file = file.take(PART_SIZE as u64);
2992
2993    let body_length = if part + 1 == n_parts {
2994        filesize % PART_SIZE
2995    } else {
2996        PART_SIZE
2997    };
2998
2999    let stream = FramedRead::new(file, BytesCodec::new());
3000    let body = Body::wrap_stream(stream);
3001
3002    let resp = http
3003        .put(url.clone())
3004        .header(CONTENT_LENGTH, body_length)
3005        .body(body)
3006        .send()
3007        .await?
3008        .error_for_status()?;
3009
3010    let etag = resp
3011        .headers()
3012        .get("etag")
3013        .ok_or_else(|| Error::InvalidEtag("Missing ETag header".to_string()))?
3014        .to_str()
3015        .map_err(|_| Error::InvalidEtag("Invalid ETag encoding".to_string()))?
3016        .to_owned();
3017
3018    // Studio Server requires etag without the quotes.
3019    let etag = etag
3020        .strip_prefix("\"")
3021        .ok_or_else(|| Error::InvalidEtag("Missing opening quote".to_string()))?;
3022    let etag = etag
3023        .strip_suffix("\"")
3024        .ok_or_else(|| Error::InvalidEtag("Missing closing quote".to_string()))?;
3025
3026    Ok(etag.to_owned())
3027}
3028
3029/// Upload a complete file to a presigned S3 URL using HTTP PUT.
3030///
3031/// This is used for populate_samples to upload files to S3 after
3032/// receiving presigned URLs from the server.
3033async fn upload_file_to_presigned_url(
3034    http: reqwest::Client,
3035    url: &str,
3036    path: PathBuf,
3037) -> Result<(), Error> {
3038    // Read the entire file into memory
3039    let file_data = fs::read(&path).await?;
3040    let file_size = file_data.len();
3041
3042    // Upload (retry is handled by reqwest client)
3043    let resp = http
3044        .put(url)
3045        .header(CONTENT_LENGTH, file_size)
3046        .body(file_data)
3047        .send()
3048        .await?;
3049
3050    if resp.status().is_success() {
3051        debug!(
3052            "Successfully uploaded file: {:?} ({} bytes)",
3053            path, file_size
3054        );
3055        Ok(())
3056    } else {
3057        let status = resp.status();
3058        let error_text = resp.text().await.unwrap_or_default();
3059        Err(Error::InvalidParameters(format!(
3060            "Upload failed: HTTP {} - {}",
3061            status, error_text
3062        )))
3063    }
3064}
3065
3066#[cfg(test)]
3067mod tests {
3068    use super::*;
3069
3070    #[test]
3071    fn test_build_filename_no_flatten() {
3072        // When flatten=false, should return base_name unchanged
3073        let result = Client::build_filename("image.jpg", false, Some(&"seq".to_string()), Some(42));
3074        assert_eq!(result, "image.jpg");
3075
3076        let result = Client::build_filename("test.png", false, None, None);
3077        assert_eq!(result, "test.png");
3078    }
3079
3080    #[test]
3081    fn test_build_filename_flatten_no_sequence() {
3082        // When flatten=true but no sequence, should return base_name unchanged
3083        let result = Client::build_filename("standalone.jpg", true, None, None);
3084        assert_eq!(result, "standalone.jpg");
3085    }
3086
3087    #[test]
3088    fn test_build_filename_flatten_with_sequence_not_prefixed() {
3089        // When flatten=true, in sequence, filename not prefixed → add prefix
3090        let result = Client::build_filename(
3091            "image.camera.jpeg",
3092            true,
3093            Some(&"deer_sequence".to_string()),
3094            Some(42),
3095        );
3096        assert_eq!(result, "deer_sequence_42_image.camera.jpeg");
3097    }
3098
3099    #[test]
3100    fn test_build_filename_flatten_with_sequence_no_frame() {
3101        // When flatten=true, in sequence, no frame number → prefix with sequence only
3102        let result =
3103            Client::build_filename("image.jpg", true, Some(&"sequence_A".to_string()), None);
3104        assert_eq!(result, "sequence_A_image.jpg");
3105    }
3106
3107    #[test]
3108    fn test_build_filename_flatten_already_prefixed() {
3109        // When flatten=true, filename already starts with sequence_ → return unchanged
3110        let result = Client::build_filename(
3111            "deer_sequence_042.camera.jpeg",
3112            true,
3113            Some(&"deer_sequence".to_string()),
3114            Some(42),
3115        );
3116        assert_eq!(result, "deer_sequence_042.camera.jpeg");
3117    }
3118
3119    #[test]
3120    fn test_build_filename_flatten_already_prefixed_different_frame() {
3121        // Edge case: filename has sequence prefix but we're adding different frame
3122        // Should still respect existing prefix
3123        let result = Client::build_filename(
3124            "sequence_A_001.jpg",
3125            true,
3126            Some(&"sequence_A".to_string()),
3127            Some(2),
3128        );
3129        assert_eq!(result, "sequence_A_001.jpg");
3130    }
3131
3132    #[test]
3133    fn test_build_filename_flatten_partial_match() {
3134        // Edge case: filename contains sequence name but not as prefix
3135        let result = Client::build_filename(
3136            "test_sequence_A_image.jpg",
3137            true,
3138            Some(&"sequence_A".to_string()),
3139            Some(5),
3140        );
3141        // Should add prefix because it doesn't START with "sequence_A_"
3142        assert_eq!(result, "sequence_A_5_test_sequence_A_image.jpg");
3143    }
3144
3145    #[test]
3146    fn test_build_filename_flatten_preserves_extension() {
3147        // Verify that file extensions are preserved correctly
3148        let extensions = vec![
3149            "jpeg",
3150            "jpg",
3151            "png",
3152            "camera.jpeg",
3153            "lidar.pcd",
3154            "depth.png",
3155        ];
3156
3157        for ext in extensions {
3158            let filename = format!("image.{}", ext);
3159            let result = Client::build_filename(&filename, true, Some(&"seq".to_string()), Some(1));
3160            assert!(
3161                result.ends_with(&format!(".{}", ext)),
3162                "Extension .{} not preserved in {}",
3163                ext,
3164                result
3165            );
3166        }
3167    }
3168
3169    #[test]
3170    fn test_build_filename_flatten_sanitization_compatibility() {
3171        // Test with sanitized path components (no special chars)
3172        let result = Client::build_filename(
3173            "sample_001.jpg",
3174            true,
3175            Some(&"seq_name_with_underscores".to_string()),
3176            Some(10),
3177        );
3178        assert_eq!(result, "seq_name_with_underscores_10_sample_001.jpg");
3179    }
3180}