Skip to main content

briefcase_core/storage/
lakefs.rs

1use super::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
2use crate::models::{DecisionSnapshot, Snapshot};
3#[cfg(feature = "networking")]
4use base64::{engine::general_purpose, Engine as _};
5#[cfg(feature = "networking")]
6use reqwest::{
7    header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
8    Client, RequestBuilder, Response,
9};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14// ─── Authentication types ────────────────────────────────────────────────────
15
16/// Supported lakeFS authentication modes.
17///
18/// lakeFS supports multiple authentication mechanisms.  `LakeFSAuth`
19/// captures the configuration needed for each.  Pass one of these to
20/// [`LakeFSConfig::with_auth`] to select the desired mode.
21///
22/// When no explicit auth is configured (`None`), the backend falls back
23/// to Basic auth using `access_key` / `secret_key` from the config.
24#[derive(Debug, Clone)]
25pub enum LakeFSAuth {
26    /// HTTP Basic authentication (access_key:secret_key).
27    /// This is the lakeFS default and the backward-compatible mode.
28    Basic {
29        access_key: String,
30        secret_key: String,
31    },
32
33    /// JWT token authentication.
34    /// The client POSTs to `/api/v1/auth/login` with access_key/secret_key
35    /// and receives a JWT.  The JWT is refreshed before expiry.
36    Token {
37        access_key: String,
38        secret_key: String,
39    },
40
41    /// STS (Security Token Service) federation.
42    /// The client POSTs an external OIDC token to `/sts/login` and receives
43    /// temporary lakeFS credentials (access_key + secret_key + session_token).
44    Sts {
45        /// The external OIDC identity token to exchange.
46        oidc_token: String,
47    },
48
49    /// IAM-based authentication (AWS instance metadata / ECS task role).
50    /// Credentials are fetched from the EC2 instance metadata service or
51    /// ECS container credentials endpoint at `169.254.169.254` / `169.254.170.2`.
52    Iam,
53
54    /// External OIDC provider authentication.
55    /// The client sends an external OIDC token to `/api/v1/oidc/login` and
56    /// receives a lakeFS JWT in return.
57    Oidc {
58        /// The external OIDC identity token.
59        token: String,
60    },
61
62    /// SAML SSO authentication.
63    /// The client POSTs a base64-encoded SAML assertion to
64    /// `/api/v1/auth/external/saml` and receives a lakeFS JWT.
65    Saml {
66        /// Base64-encoded SAML assertion XML.
67        assertion: String,
68    },
69}
70
71// ─── AuthProvider (internal) ─────────────────────────────────────────────────
72
73/// Internal trait for dynamic authentication header management.
74///
75/// Each auth mode implements this trait.  The `LakeFSBackend` calls
76/// `auth_header()` before every HTTP request, allowing token-based modes
77/// to transparently refresh expired credentials.
78#[cfg(all(feature = "async", feature = "networking"))]
79#[async_trait::async_trait]
80pub(crate) trait AuthProvider: Send + Sync {
81    /// Return the current `Authorization` header value.
82    ///
83    /// Implementations that use expiring tokens should check for expiry
84    /// inside this method and refresh transparently.
85    async fn auth_header(&self) -> Result<HeaderValue, StorageError>;
86
87    /// Human-readable name for logging.
88    fn mode_name(&self) -> &'static str;
89}
90
91// ── BasicAuthProvider ────────────────────────────────────────────────────────
92
93#[cfg(all(feature = "async", feature = "networking"))]
94pub(crate) struct BasicAuthProvider {
95    header: HeaderValue,
96}
97
98#[cfg(all(feature = "async", feature = "networking"))]
99impl BasicAuthProvider {
100    pub fn new(access_key: &str, secret_key: &str) -> Result<Self, StorageError> {
101        let credentials = format!("{}:{}", access_key, secret_key);
102        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
103        let header_str = format!("Basic {}", encoded);
104        let header = HeaderValue::from_str(&header_str).map_err(|e| {
105            StorageError::ConnectionError(format!("Invalid Basic auth header: {}", e))
106        })?;
107        Ok(Self { header })
108    }
109}
110
111#[cfg(all(feature = "async", feature = "networking"))]
112#[async_trait::async_trait]
113impl AuthProvider for BasicAuthProvider {
114    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
115        Ok(self.header.clone())
116    }
117    fn mode_name(&self) -> &'static str {
118        "basic"
119    }
120}
121
122// ── TokenAuthProvider (JWT via /api/v1/auth/login) ───────────────────────────
123
124#[cfg(all(feature = "async", feature = "networking"))]
125pub(crate) struct TokenAuthProvider {
126    endpoint: String,
127    access_key: String,
128    secret_key: String,
129    client: Client,
130    /// Cached JWT + expiry (seconds since UNIX epoch).
131    cache: Mutex<Option<(String, u64)>>,
132}
133
134#[cfg(all(feature = "async", feature = "networking"))]
135impl TokenAuthProvider {
136    pub fn new(endpoint: &str, access_key: &str, secret_key: &str) -> Self {
137        let client = Client::builder()
138            .timeout(std::time::Duration::from_secs(10))
139            .build()
140            .unwrap_or_default();
141        Self {
142            endpoint: endpoint.trim_end_matches('/').to_string(),
143            access_key: access_key.to_string(),
144            secret_key: secret_key.to_string(),
145            client,
146            cache: Mutex::new(None),
147        }
148    }
149
150    /// Fetch (or refresh) the JWT.
151    async fn fetch_token(&self) -> Result<(String, u64), StorageError> {
152        #[derive(Serialize)]
153        struct LoginRequest {
154            access_key_id: String,
155            secret_access_key: String,
156        }
157
158        #[derive(Deserialize)]
159        struct LoginResponse {
160            token: String,
161            #[serde(default)]
162            token_expiration: Option<u64>,
163        }
164
165        let url = format!("{}/api/v1/auth/login", self.endpoint);
166        let body = LoginRequest {
167            access_key_id: self.access_key.clone(),
168            secret_access_key: self.secret_key.clone(),
169        };
170
171        let resp = self
172            .client
173            .post(&url)
174            .json(&body)
175            .send()
176            .await
177            .map_err(|e| StorageError::ConnectionError(format!("Token login failed: {}", e)))?;
178
179        if !resp.status().is_success() {
180            let status = resp.status();
181            let text = resp.text().await.unwrap_or_default();
182            return Err(StorageError::ConnectionError(format!(
183                "Token login returned {}: {}",
184                status, text
185            )));
186        }
187
188        let login: LoginResponse = resp.json().await.map_err(|e| {
189            StorageError::ConnectionError(format!("Failed to parse login response: {}", e))
190        })?;
191
192        // Default expiry: 1 hour from now
193        let now_secs = std::time::SystemTime::now()
194            .duration_since(std::time::UNIX_EPOCH)
195            .unwrap_or_default()
196            .as_secs();
197        let expiry = login.token_expiration.unwrap_or(now_secs + 3600);
198
199        Ok((login.token, expiry))
200    }
201}
202
203#[cfg(all(feature = "async", feature = "networking"))]
204#[async_trait::async_trait]
205impl AuthProvider for TokenAuthProvider {
206    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
207        // Check cache — refresh if expired or within 60s of expiry.
208        let needs_refresh = {
209            let cache = self.cache.lock().unwrap();
210            match &*cache {
211                Some((_token, expiry)) => {
212                    let now = std::time::SystemTime::now()
213                        .duration_since(std::time::UNIX_EPOCH)
214                        .unwrap_or_default()
215                        .as_secs();
216                    now + 60 >= *expiry
217                }
218                None => true,
219            }
220        };
221
222        if needs_refresh {
223            let (token, expiry) = self.fetch_token().await?;
224            let mut cache = self.cache.lock().unwrap();
225            *cache = Some((token, expiry));
226        }
227
228        let cache = self.cache.lock().unwrap();
229        let (token, _) = cache.as_ref().unwrap();
230        let header_str = format!("Bearer {}", token);
231        HeaderValue::from_str(&header_str)
232            .map_err(|e| StorageError::ConnectionError(format!("Invalid Bearer header: {}", e)))
233    }
234
235    fn mode_name(&self) -> &'static str {
236        "token"
237    }
238}
239
240// ── StsAuthProvider (POST /sts/login with OIDC token) ────────────────────────
241
242#[cfg(all(feature = "async", feature = "networking"))]
243pub(crate) struct StsAuthProvider {
244    endpoint: String,
245    oidc_token: String,
246    client: Client,
247    /// Cached temp credentials: (access_key, secret_key, session_token, expiry_secs).
248    cache: Mutex<Option<(String, String, String, u64)>>,
249}
250
251#[cfg(all(feature = "async", feature = "networking"))]
252impl StsAuthProvider {
253    pub fn new(endpoint: &str, oidc_token: &str) -> Self {
254        let client = Client::builder()
255            .timeout(std::time::Duration::from_secs(10))
256            .build()
257            .unwrap_or_default();
258        Self {
259            endpoint: endpoint.trim_end_matches('/').to_string(),
260            oidc_token: oidc_token.to_string(),
261            client,
262            cache: Mutex::new(None),
263        }
264    }
265
266    async fn fetch_temp_credentials(&self) -> Result<(String, String, String, u64), StorageError> {
267        #[derive(Deserialize)]
268        struct StsResponse {
269            #[serde(rename = "Credentials")]
270            credentials: StsCredentials,
271        }
272
273        #[derive(Deserialize)]
274        struct StsCredentials {
275            #[serde(rename = "AccessKeyId")]
276            access_key_id: String,
277            #[serde(rename = "SecretAccessKey")]
278            secret_access_key: String,
279            #[serde(rename = "SessionToken")]
280            session_token: String,
281            #[serde(rename = "Expiration", default)]
282            expiration: Option<u64>,
283        }
284
285        let url = format!("{}/sts/login", self.endpoint);
286
287        let resp = self
288            .client
289            .post(&url)
290            .header("Content-Type", "application/x-www-form-urlencoded")
291            .body(format!(
292                "Action=AssumeRoleWithWebIdentity&WebIdentityToken={}&Version=2011-06-15",
293                self.oidc_token
294            ))
295            .send()
296            .await
297            .map_err(|e| StorageError::ConnectionError(format!("STS login failed: {}", e)))?;
298
299        if !resp.status().is_success() {
300            let status = resp.status();
301            let text = resp.text().await.unwrap_or_default();
302            return Err(StorageError::ConnectionError(format!(
303                "STS login returned {}: {}",
304                status, text
305            )));
306        }
307
308        let sts: StsResponse = resp.json().await.map_err(|e| {
309            StorageError::ConnectionError(format!("Failed to parse STS response: {}", e))
310        })?;
311
312        let now_secs = std::time::SystemTime::now()
313            .duration_since(std::time::UNIX_EPOCH)
314            .unwrap_or_default()
315            .as_secs();
316        let expiry = sts.credentials.expiration.unwrap_or(now_secs + 3600);
317
318        Ok((
319            sts.credentials.access_key_id,
320            sts.credentials.secret_access_key,
321            sts.credentials.session_token,
322            expiry,
323        ))
324    }
325}
326
327#[cfg(all(feature = "async", feature = "networking"))]
328#[async_trait::async_trait]
329impl AuthProvider for StsAuthProvider {
330    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
331        let needs_refresh = {
332            let cache = self.cache.lock().unwrap();
333            match &*cache {
334                Some((_, _, _, expiry)) => {
335                    let now = std::time::SystemTime::now()
336                        .duration_since(std::time::UNIX_EPOCH)
337                        .unwrap_or_default()
338                        .as_secs();
339                    now + 60 >= *expiry
340                }
341                None => true,
342            }
343        };
344
345        if needs_refresh {
346            let creds = self.fetch_temp_credentials().await?;
347            let mut cache = self.cache.lock().unwrap();
348            *cache = Some(creds);
349        }
350
351        // STS temp credentials use Basic auth with the temp access_key:secret_key.
352        // The session token is sent as `X-Lakefs-Session-Token` but since we only
353        // control the Authorization header here, we encode access_key:secret_key.
354        let cache = self.cache.lock().unwrap();
355        let (ak, sk, _session_token, _) = cache.as_ref().unwrap();
356        let credentials = format!("{}:{}", ak, sk);
357        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
358        let header_str = format!("Basic {}", encoded);
359        HeaderValue::from_str(&header_str)
360            .map_err(|e| StorageError::ConnectionError(format!("Invalid STS auth header: {}", e)))
361    }
362
363    fn mode_name(&self) -> &'static str {
364        "sts"
365    }
366}
367
368// ── IamAuthProvider (AWS instance metadata / ECS task role) ──────────────────
369
370#[cfg(all(feature = "async", feature = "networking"))]
371pub(crate) struct IamAuthProvider {
372    client: Client,
373    /// Cached credentials: (access_key, secret_key, token, expiry_secs).
374    cache: Mutex<Option<IamCredentialCache>>,
375}
376
377#[cfg(all(feature = "async", feature = "networking"))]
378type IamCredentialCache = (String, String, Option<String>, u64);
379
380#[cfg(all(feature = "async", feature = "networking"))]
381impl IamAuthProvider {
382    pub fn new() -> Self {
383        let client = Client::builder()
384            .timeout(std::time::Duration::from_secs(5))
385            .build()
386            .unwrap_or_default();
387        Self {
388            client,
389            cache: Mutex::new(None),
390        }
391    }
392
393    /// Try ECS container credentials first, then fall back to EC2 instance metadata.
394    async fn fetch_credentials(&self) -> Result<IamCredentialCache, StorageError> {
395        // 1. Try ECS container credentials endpoint
396        if let Ok(creds_uri) = std::env::var("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") {
397            let url = format!("http://169.254.170.2{}", creds_uri);
398            if let Ok(resp) = self.client.get(&url).send().await {
399                if resp.status().is_success() {
400                    if let Ok(creds) = resp.json::<IamCredentials>().await {
401                        let now_secs = std::time::SystemTime::now()
402                            .duration_since(std::time::UNIX_EPOCH)
403                            .unwrap_or_default()
404                            .as_secs();
405                        return Ok((
406                            creds.access_key_id,
407                            creds.secret_access_key,
408                            creds.token,
409                            now_secs + 3600,
410                        ));
411                    }
412                }
413            }
414        }
415
416        // 2. Fall back to EC2 instance metadata (IMDSv2)
417        let token_resp = self
418            .client
419            .put("http://169.254.169.254/latest/api/token")
420            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421            .send()
422            .await
423            .map_err(|e| {
424                StorageError::ConnectionError(format!("IAM: Failed to fetch IMDS token: {}", e))
425            })?;
426
427        let imds_token = token_resp.text().await.map_err(|e| {
428            StorageError::ConnectionError(format!("IAM: Failed to read IMDS token: {}", e))
429        })?;
430
431        // Get role name
432        let role_resp = self
433            .client
434            .get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")
435            .header("X-aws-ec2-metadata-token", &imds_token)
436            .send()
437            .await
438            .map_err(|e| {
439                StorageError::ConnectionError(format!("IAM: Failed to fetch role name: {}", e))
440            })?;
441
442        let role_name = role_resp.text().await.map_err(|e| {
443            StorageError::ConnectionError(format!("IAM: Failed to read role name: {}", e))
444        })?;
445        let role_name = role_name.trim();
446
447        // Get credentials for role
448        let creds_url = format!(
449            "http://169.254.169.254/latest/meta-data/iam/security-credentials/{}",
450            role_name
451        );
452        let creds_resp = self
453            .client
454            .get(&creds_url)
455            .header("X-aws-ec2-metadata-token", &imds_token)
456            .send()
457            .await
458            .map_err(|e| {
459                StorageError::ConnectionError(format!("IAM: Failed to fetch credentials: {}", e))
460            })?;
461
462        let creds: IamCredentials = creds_resp.json().await.map_err(|e| {
463            StorageError::ConnectionError(format!("IAM: Failed to parse credentials: {}", e))
464        })?;
465
466        let now_secs = std::time::SystemTime::now()
467            .duration_since(std::time::UNIX_EPOCH)
468            .unwrap_or_default()
469            .as_secs();
470
471        Ok((
472            creds.access_key_id,
473            creds.secret_access_key,
474            creds.token,
475            now_secs + 3600,
476        ))
477    }
478}
479
480#[cfg(all(feature = "async", feature = "networking"))]
481#[derive(Deserialize)]
482struct IamCredentials {
483    #[serde(rename = "AccessKeyId", alias = "accessKeyId")]
484    access_key_id: String,
485    #[serde(rename = "SecretAccessKey", alias = "secretAccessKey")]
486    secret_access_key: String,
487    #[serde(rename = "Token", alias = "token", alias = "SessionToken", default)]
488    token: Option<String>,
489}
490
491#[cfg(all(feature = "async", feature = "networking"))]
492#[async_trait::async_trait]
493impl AuthProvider for IamAuthProvider {
494    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
495        let needs_refresh = {
496            let cache = self.cache.lock().unwrap();
497            match &*cache {
498                Some((_, _, _, expiry)) => {
499                    let now = std::time::SystemTime::now()
500                        .duration_since(std::time::UNIX_EPOCH)
501                        .unwrap_or_default()
502                        .as_secs();
503                    now + 60 >= *expiry
504                }
505                None => true,
506            }
507        };
508
509        if needs_refresh {
510            let creds = self.fetch_credentials().await?;
511            let mut cache = self.cache.lock().unwrap();
512            *cache = Some(creds);
513        }
514
515        let cache = self.cache.lock().unwrap();
516        let (ak, sk, _, _) = cache.as_ref().unwrap();
517        let credentials = format!("{}:{}", ak, sk);
518        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
519        let header_str = format!("Basic {}", encoded);
520        HeaderValue::from_str(&header_str)
521            .map_err(|e| StorageError::ConnectionError(format!("Invalid IAM auth header: {}", e)))
522    }
523
524    fn mode_name(&self) -> &'static str {
525        "iam"
526    }
527}
528
529// ── OidcAuthProvider (POST /api/v1/oidc/login) ──────────────────────────────
530
531#[cfg(all(feature = "async", feature = "networking"))]
532pub(crate) struct OidcAuthProvider {
533    endpoint: String,
534    external_token: String,
535    client: Client,
536    /// Cached lakeFS JWT + expiry.
537    cache: Mutex<Option<(String, u64)>>,
538}
539
540#[cfg(all(feature = "async", feature = "networking"))]
541impl OidcAuthProvider {
542    pub fn new(endpoint: &str, external_token: &str) -> Self {
543        let client = Client::builder()
544            .timeout(std::time::Duration::from_secs(10))
545            .build()
546            .unwrap_or_default();
547        Self {
548            endpoint: endpoint.trim_end_matches('/').to_string(),
549            external_token: external_token.to_string(),
550            client,
551            cache: Mutex::new(None),
552        }
553    }
554
555    async fn exchange_token(&self) -> Result<(String, u64), StorageError> {
556        #[derive(Deserialize)]
557        struct OidcResponse {
558            token: String,
559            #[serde(default)]
560            token_expiration: Option<u64>,
561        }
562
563        let url = format!("{}/api/v1/oidc/login", self.endpoint);
564
565        let resp = self
566            .client
567            .post(&url)
568            .header("Content-Type", "application/x-www-form-urlencoded")
569            .body(format!("code={}", self.external_token))
570            .send()
571            .await
572            .map_err(|e| StorageError::ConnectionError(format!("OIDC login failed: {}", e)))?;
573
574        if !resp.status().is_success() {
575            let status = resp.status();
576            let text = resp.text().await.unwrap_or_default();
577            return Err(StorageError::ConnectionError(format!(
578                "OIDC login returned {}: {}",
579                status, text
580            )));
581        }
582
583        let oidc: OidcResponse = resp.json().await.map_err(|e| {
584            StorageError::ConnectionError(format!("Failed to parse OIDC response: {}", e))
585        })?;
586
587        let now_secs = std::time::SystemTime::now()
588            .duration_since(std::time::UNIX_EPOCH)
589            .unwrap_or_default()
590            .as_secs();
591        let expiry = oidc.token_expiration.unwrap_or(now_secs + 3600);
592
593        Ok((oidc.token, expiry))
594    }
595}
596
597#[cfg(all(feature = "async", feature = "networking"))]
598#[async_trait::async_trait]
599impl AuthProvider for OidcAuthProvider {
600    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
601        let needs_refresh = {
602            let cache = self.cache.lock().unwrap();
603            match &*cache {
604                Some((_token, expiry)) => {
605                    let now = std::time::SystemTime::now()
606                        .duration_since(std::time::UNIX_EPOCH)
607                        .unwrap_or_default()
608                        .as_secs();
609                    now + 60 >= *expiry
610                }
611                None => true,
612            }
613        };
614
615        if needs_refresh {
616            let (token, expiry) = self.exchange_token().await?;
617            let mut cache = self.cache.lock().unwrap();
618            *cache = Some((token, expiry));
619        }
620
621        let cache = self.cache.lock().unwrap();
622        let (token, _) = cache.as_ref().unwrap();
623        let header_str = format!("Bearer {}", token);
624        HeaderValue::from_str(&header_str).map_err(|e| {
625            StorageError::ConnectionError(format!("Invalid OIDC Bearer header: {}", e))
626        })
627    }
628
629    fn mode_name(&self) -> &'static str {
630        "oidc"
631    }
632}
633
634// ── SamlAuthProvider (POST /api/v1/auth/external/saml) ──────────────────────
635
636#[cfg(all(feature = "async", feature = "networking"))]
637pub(crate) struct SamlAuthProvider {
638    endpoint: String,
639    assertion: String,
640    client: Client,
641    /// Cached lakeFS JWT + expiry.
642    cache: Mutex<Option<(String, u64)>>,
643}
644
645#[cfg(all(feature = "async", feature = "networking"))]
646impl SamlAuthProvider {
647    pub fn new(endpoint: &str, assertion: &str) -> Self {
648        let client = Client::builder()
649            .timeout(std::time::Duration::from_secs(10))
650            .build()
651            .unwrap_or_default();
652        Self {
653            endpoint: endpoint.trim_end_matches('/').to_string(),
654            assertion: assertion.to_string(),
655            client,
656            cache: Mutex::new(None),
657        }
658    }
659
660    async fn exchange_assertion(&self) -> Result<(String, u64), StorageError> {
661        #[derive(Deserialize)]
662        struct SamlResponse {
663            token: String,
664            #[serde(default)]
665            token_expiration: Option<u64>,
666        }
667
668        let url = format!("{}/api/v1/auth/external/saml", self.endpoint);
669
670        let resp = self
671            .client
672            .post(&url)
673            .header("Content-Type", "application/x-www-form-urlencoded")
674            .body(format!("SAMLResponse={}", self.assertion))
675            .send()
676            .await
677            .map_err(|e| StorageError::ConnectionError(format!("SAML login failed: {}", e)))?;
678
679        if !resp.status().is_success() {
680            let status = resp.status();
681            let text = resp.text().await.unwrap_or_default();
682            return Err(StorageError::ConnectionError(format!(
683                "SAML login returned {}: {}",
684                status, text
685            )));
686        }
687
688        let saml: SamlResponse = resp.json().await.map_err(|e| {
689            StorageError::ConnectionError(format!("Failed to parse SAML response: {}", e))
690        })?;
691
692        let now_secs = std::time::SystemTime::now()
693            .duration_since(std::time::UNIX_EPOCH)
694            .unwrap_or_default()
695            .as_secs();
696        let expiry = saml.token_expiration.unwrap_or(now_secs + 3600);
697
698        Ok((saml.token, expiry))
699    }
700}
701
702#[cfg(all(feature = "async", feature = "networking"))]
703#[async_trait::async_trait]
704impl AuthProvider for SamlAuthProvider {
705    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
706        let needs_refresh = {
707            let cache = self.cache.lock().unwrap();
708            match &*cache {
709                Some((_token, expiry)) => {
710                    let now = std::time::SystemTime::now()
711                        .duration_since(std::time::UNIX_EPOCH)
712                        .unwrap_or_default()
713                        .as_secs();
714                    now + 60 >= *expiry
715                }
716                None => true,
717            }
718        };
719
720        if needs_refresh {
721            let (token, expiry) = self.exchange_assertion().await?;
722            let mut cache = self.cache.lock().unwrap();
723            *cache = Some((token, expiry));
724        }
725
726        let cache = self.cache.lock().unwrap();
727        let (token, _) = cache.as_ref().unwrap();
728        let header_str = format!("Bearer {}", token);
729        HeaderValue::from_str(&header_str).map_err(|e| {
730            StorageError::ConnectionError(format!("Invalid SAML Bearer header: {}", e))
731        })
732    }
733
734    fn mode_name(&self) -> &'static str {
735        "saml"
736    }
737}
738
739// ── Build AuthProvider from LakeFSAuth enum ─────────────────────────────────
740
741#[cfg(all(feature = "async", feature = "networking"))]
742fn build_auth_provider(
743    auth: &LakeFSAuth,
744    endpoint: &str,
745) -> Result<Arc<dyn AuthProvider>, StorageError> {
746    match auth {
747        LakeFSAuth::Basic {
748            access_key,
749            secret_key,
750        } => Ok(Arc::new(BasicAuthProvider::new(access_key, secret_key)?)),
751        LakeFSAuth::Token {
752            access_key,
753            secret_key,
754        } => Ok(Arc::new(TokenAuthProvider::new(
755            endpoint, access_key, secret_key,
756        ))),
757        LakeFSAuth::Sts { oidc_token } => Ok(Arc::new(StsAuthProvider::new(endpoint, oidc_token))),
758        LakeFSAuth::Iam => Ok(Arc::new(IamAuthProvider::new())),
759        LakeFSAuth::Oidc { token } => Ok(Arc::new(OidcAuthProvider::new(endpoint, token))),
760        LakeFSAuth::Saml { assertion } => Ok(Arc::new(SamlAuthProvider::new(endpoint, assertion))),
761    }
762}
763
764// ─── New response / request types ────────────────────────────────────────────
765
766/// Result of a branch merge operation.
767#[derive(Debug, Clone, Serialize, Deserialize)]
768pub struct MergeResult {
769    /// The commit ID produced by the merge (empty string on conflict).
770    pub commit_id: String,
771    /// Summary text returned by LakeFS.
772    pub summary: String,
773}
774
775/// Information about a single LakeFS commit.
776#[derive(Debug, Clone, Serialize, Deserialize)]
777pub struct CommitInfo {
778    pub id: String,
779    pub message: String,
780    pub committer: String,
781    #[serde(default)]
782    pub metadata: HashMap<String, String>,
783    /// Unix-epoch seconds.  LakeFS returns `creation_date` as an i64.
784    #[serde(default)]
785    pub creation_date: i64,
786}
787
788/// A single entry in a diff listing.
789#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct DiffEntry {
791    /// Object path.
792    pub path: String,
793    /// "added", "removed", "changed", or "conflict".
794    #[serde(rename = "type")]
795    pub diff_type: String,
796    /// Size in bytes (only for added / changed).
797    #[serde(default)]
798    pub size_bytes: Option<u64>,
799}
800
801/// Object metadata returned by the stat endpoint.
802#[derive(Debug, Clone, Serialize, Deserialize)]
803pub struct ObjectStats {
804    pub path: String,
805    #[serde(default)]
806    pub physical_address: String,
807    pub size_bytes: u64,
808    #[serde(default)]
809    pub content_type: String,
810    /// Unix-epoch seconds.
811    #[serde(default)]
812    pub mtime: i64,
813    #[serde(default)]
814    pub checksum: String,
815}
816
817/// Describes a LakeFS Action (webhook) to register.
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct LakeFSAction {
820    pub name: String,
821    /// e.g. `["pre-commit", "post-merge"]`
822    pub on: Vec<String>,
823    pub hooks: Vec<ActionHook>,
824}
825
826/// A single hook inside a LakeFS Action.
827#[derive(Debug, Clone, Serialize, Deserialize)]
828pub struct ActionHook {
829    pub id: String,
830    #[serde(rename = "type")]
831    pub hook_type: String,
832    pub description: String,
833    #[serde(default)]
834    pub properties: HashMap<String, String>,
835}
836
837/// Repository information returned by LakeFS.
838#[derive(Debug, Clone, Serialize, Deserialize)]
839pub struct RepositoryInfo {
840    pub id: String,
841    pub storage_namespace: String,
842    pub default_branch: String,
843    #[serde(default)]
844    pub creation_date: i64,
845}
846
847/// Branch reference returned by LakeFS.
848#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct BranchInfo {
850    pub id: String,
851    pub commit_id: String,
852}
853
854// ─── Core backend ────────────────────────────────────────────────────────────
855
856#[cfg(all(feature = "async", feature = "networking"))]
857#[derive(Clone)]
858pub struct LakeFSBackend {
859    client: Client,
860    endpoint: String,
861    repository: String, // "engagement" in Briefcase terminology
862    branch: String,     // "workstream" in Briefcase terminology
863    #[allow(dead_code)]
864    access_key: String,
865    #[allow(dead_code)]
866    secret_key: String,
867    /// Per-request auth provider.  Replaces static default headers so that
868    /// token-based modes (JWT, STS, OIDC, SAML) can refresh transparently.
869    auth_provider: Arc<dyn AuthProvider>,
870    pending_writes: Arc<Mutex<Vec<Snapshot>>>,
871}
872
873#[cfg(all(feature = "async", feature = "networking"))]
874impl LakeFSBackend {
875    pub fn new(config: LakeFSConfig) -> Result<Self, StorageError> {
876        let endpoint = config.endpoint.trim_end_matches('/').to_string();
877
878        // Build auth provider from explicit config or default to Basic.
879        let auth = config.auth.clone().unwrap_or_else(|| LakeFSAuth::Basic {
880            access_key: config.access_key.clone(),
881            secret_key: config.secret_key.clone(),
882        });
883        let auth_provider = build_auth_provider(&auth, &endpoint)?;
884
885        // Default headers — Content-Type only.  Authorization is per-request.
886        let mut headers = HeaderMap::new();
887        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
888
889        let client = Client::builder()
890            .default_headers(headers)
891            .timeout(std::time::Duration::from_secs(30))
892            .build()
893            .map_err(|e| {
894                StorageError::ConnectionError(format!("Failed to create HTTP client: {}", e))
895            })?;
896
897        Ok(Self {
898            client,
899            endpoint,
900            repository: config.repository,
901            branch: config.branch,
902            access_key: config.access_key,
903            secret_key: config.secret_key,
904            auth_provider,
905            pending_writes: Arc::new(Mutex::new(Vec::new())),
906        })
907    }
908
909    /// Send an HTTP request with the current authorization header.
910    ///
911    /// All lakeFS API calls route through this method so that token refresh
912    /// happens transparently for JWT/STS/OIDC/SAML modes.
913    async fn send_authed(&self, builder: RequestBuilder) -> Result<Response, StorageError> {
914        let auth_header = self.auth_provider.auth_header().await?;
915        builder
916            .header(AUTHORIZATION, auth_header)
917            .send()
918            .await
919            .map_err(|e| StorageError::ConnectionError(format!("Request failed: {}", e)))
920    }
921
922    /// Returns the active authentication mode name (for logging/diagnostics).
923    pub fn auth_mode(&self) -> &'static str {
924        self.auth_provider.mode_name()
925    }
926
927    // ── Accessors ────────────────────────────────────────────────────────
928
929    /// Returns the configured repository name.
930    pub fn repository(&self) -> &str {
931        &self.repository
932    }
933
934    /// Returns the configured branch name.
935    pub fn branch(&self) -> &str {
936        &self.branch
937    }
938
939    /// Returns the configured endpoint URL.
940    pub fn endpoint(&self) -> &str {
941        &self.endpoint
942    }
943
944    // ── Commit operations ────────────────────────────────────────────────
945
946    /// Create a commit (checkpoint) with pending writes.
947    pub async fn create_commit(&self, message: &str) -> Result<String, StorageError> {
948        let url = format!(
949            "{}/api/v1/repositories/{}/branches/{}/commits",
950            self.endpoint, self.repository, self.branch
951        );
952
953        #[derive(Serialize)]
954        struct CommitRequest {
955            message: String,
956            metadata: HashMap<String, String>,
957        }
958
959        let mut metadata = HashMap::new();
960        metadata.insert("source".to_string(), "briefcase-ai".to_string());
961
962        let request = CommitRequest {
963            message: message.to_string(),
964            metadata,
965        };
966
967        let response = self
968            .send_authed(self.client.post(&url).json(&request))
969            .await?;
970
971        let status = response.status();
972        if !status.is_success() {
973            let error_text = response.text().await.unwrap_or_default();
974            return Err(StorageError::ConnectionError(format!(
975                "Commit failed with status {}: {}",
976                status, error_text
977            )));
978        }
979
980        #[derive(Deserialize)]
981        struct CommitResponse {
982            id: String,
983        }
984
985        let commit_response: CommitResponse = response.json().await.map_err(|e| {
986            StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
987        })?;
988
989        Ok(commit_response.id)
990    }
991
992    // ── Object operations ────────────────────────────────────────────────
993
994    /// Upload object to LakeFS.
995    pub async fn upload_object(&self, path: &str, data: &[u8]) -> Result<(), StorageError> {
996        let url = format!(
997            "{}/api/v1/repositories/{}/branches/{}/objects",
998            self.endpoint, self.repository, self.branch
999        );
1000
1001        let response = self
1002            .send_authed(
1003                self.client
1004                    .put(&url)
1005                    .query(&[("path", path)])
1006                    .header("Content-Type", "application/octet-stream")
1007                    .body(data.to_vec()),
1008            )
1009            .await?;
1010
1011        let status = response.status();
1012        if !status.is_success() {
1013            let error_text = response.text().await.unwrap_or_default();
1014            return Err(StorageError::ConnectionError(format!(
1015                "Upload failed with status {}: {}",
1016                status, error_text
1017            )));
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Download object from LakeFS.
1024    pub async fn download_object(&self, path: &str) -> Result<Vec<u8>, StorageError> {
1025        let url = format!(
1026            "{}/api/v1/repositories/{}/refs/{}/objects",
1027            self.endpoint, self.repository, self.branch
1028        );
1029
1030        let response = self
1031            .send_authed(self.client.get(&url).query(&[("path", path)]))
1032            .await?;
1033
1034        if response.status() == reqwest::StatusCode::NOT_FOUND {
1035            return Err(StorageError::NotFound(format!(
1036                "Object not found: {}",
1037                path
1038            )));
1039        }
1040
1041        let status = response.status();
1042        if !status.is_success() {
1043            let error_text = response.text().await.unwrap_or_default();
1044            return Err(StorageError::ConnectionError(format!(
1045                "Download failed with status {}: {}",
1046                status, error_text
1047            )));
1048        }
1049
1050        let data = response.bytes().await.map_err(|e| {
1051            StorageError::ConnectionError(format!("Failed to read response: {}", e))
1052        })?;
1053
1054        Ok(data.to_vec())
1055    }
1056
1057    /// List objects with prefix.
1058    pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
1059        let url = format!(
1060            "{}/api/v1/repositories/{}/refs/{}/objects/ls",
1061            self.endpoint, self.repository, self.branch
1062        );
1063
1064        let response = self
1065            .send_authed(self.client.get(&url).query(&[("prefix", prefix)]))
1066            .await?;
1067
1068        let status = response.status();
1069        if !status.is_success() {
1070            let error_text = response.text().await.unwrap_or_default();
1071            return Err(StorageError::ConnectionError(format!(
1072                "List failed with status {}: {}",
1073                status, error_text
1074            )));
1075        }
1076
1077        #[derive(Deserialize)]
1078        struct ListResponse {
1079            results: Vec<ObjectInfo>,
1080        }
1081
1082        #[derive(Deserialize)]
1083        struct ObjectInfo {
1084            path: String,
1085            #[serde(rename = "type")]
1086            object_type: String,
1087        }
1088
1089        let list_response: ListResponse = response.json().await.map_err(|e| {
1090            StorageError::SerializationError(format!("Failed to parse list response: {}", e))
1091        })?;
1092
1093        let paths = list_response
1094            .results
1095            .into_iter()
1096            .filter(|obj| obj.object_type == "object")
1097            .map(|obj| obj.path)
1098            .collect();
1099
1100        Ok(paths)
1101    }
1102
1103    /// Delete an object from the current branch.
1104    pub async fn delete_object(&self, path: &str) -> Result<(), StorageError> {
1105        if path.is_empty() {
1106            return Err(StorageError::InvalidQuery(
1107                "Object path cannot be empty".to_string(),
1108            ));
1109        }
1110
1111        let url = format!(
1112            "{}/api/v1/repositories/{}/branches/{}/objects",
1113            self.endpoint, self.repository, self.branch
1114        );
1115
1116        let response = self
1117            .send_authed(self.client.delete(&url).query(&[("path", path)]))
1118            .await?;
1119
1120        if response.status() == reqwest::StatusCode::NOT_FOUND {
1121            return Err(StorageError::NotFound(format!(
1122                "Object not found: {}",
1123                path
1124            )));
1125        }
1126
1127        let status = response.status();
1128        if !status.is_success() {
1129            let error_text = response.text().await.unwrap_or_default();
1130            return Err(StorageError::ConnectionError(format!(
1131                "Delete failed with status {}: {}",
1132                status, error_text
1133            )));
1134        }
1135
1136        Ok(())
1137    }
1138
1139    /// Get object statistics/metadata.
1140    pub async fn get_object_stats(&self, path: &str) -> Result<ObjectStats, StorageError> {
1141        if path.is_empty() {
1142            return Err(StorageError::InvalidQuery(
1143                "Object path cannot be empty".to_string(),
1144            ));
1145        }
1146
1147        let url = format!(
1148            "{}/api/v1/repositories/{}/refs/{}/objects/stat",
1149            self.endpoint, self.repository, self.branch
1150        );
1151
1152        let response = self
1153            .send_authed(self.client.get(&url).query(&[("path", path)]))
1154            .await?;
1155
1156        if response.status() == reqwest::StatusCode::NOT_FOUND {
1157            return Err(StorageError::NotFound(format!(
1158                "Object not found: {}",
1159                path
1160            )));
1161        }
1162
1163        let status = response.status();
1164        if !status.is_success() {
1165            let error_text = response.text().await.unwrap_or_default();
1166            return Err(StorageError::ConnectionError(format!(
1167                "Stats failed with status {}: {}",
1168                status, error_text
1169            )));
1170        }
1171
1172        let stats: ObjectStats = response.json().await.map_err(|e| {
1173            StorageError::SerializationError(format!("Failed to parse stats response: {}", e))
1174        })?;
1175
1176        Ok(stats)
1177    }
1178
1179    // ── Repository operations ────────────────────────────────────────────
1180
1181    /// Create a new LakeFS repository (engagement).
1182    pub async fn create_repository(
1183        &self,
1184        name: &str,
1185        storage_namespace: &str,
1186        default_branch: Option<&str>,
1187    ) -> Result<RepositoryInfo, StorageError> {
1188        if name.is_empty() {
1189            return Err(StorageError::InvalidQuery(
1190                "Repository name cannot be empty".to_string(),
1191            ));
1192        }
1193        if storage_namespace.is_empty() {
1194            return Err(StorageError::InvalidQuery(
1195                "Storage namespace cannot be empty".to_string(),
1196            ));
1197        }
1198
1199        let url = format!("{}/api/v1/repositories", self.endpoint);
1200
1201        #[derive(Serialize)]
1202        struct CreateRepoRequest {
1203            name: String,
1204            storage_namespace: String,
1205            default_branch: String,
1206        }
1207
1208        let request = CreateRepoRequest {
1209            name: name.to_string(),
1210            storage_namespace: storage_namespace.to_string(),
1211            default_branch: default_branch.unwrap_or("main").to_string(),
1212        };
1213
1214        let response = self
1215            .send_authed(self.client.post(&url).json(&request))
1216            .await?;
1217
1218        if response.status() == reqwest::StatusCode::CONFLICT {
1219            return Err(StorageError::ConnectionError(format!(
1220                "Repository '{}' already exists",
1221                name
1222            )));
1223        }
1224
1225        let status = response.status();
1226        if !status.is_success() {
1227            let error_text = response.text().await.unwrap_or_default();
1228            return Err(StorageError::ConnectionError(format!(
1229                "Create repository failed with status {}: {}",
1230                status, error_text
1231            )));
1232        }
1233
1234        let info: RepositoryInfo = response.json().await.map_err(|e| {
1235            StorageError::SerializationError(format!("Failed to parse repository response: {}", e))
1236        })?;
1237
1238        Ok(info)
1239    }
1240
1241    /// Delete a LakeFS repository.
1242    pub async fn delete_repository(&self, name: &str) -> Result<(), StorageError> {
1243        if name.is_empty() {
1244            return Err(StorageError::InvalidQuery(
1245                "Repository name cannot be empty".to_string(),
1246            ));
1247        }
1248
1249        let url = format!("{}/api/v1/repositories/{}", self.endpoint, name);
1250
1251        let response = self.send_authed(self.client.delete(&url)).await?;
1252
1253        if response.status() == reqwest::StatusCode::NOT_FOUND {
1254            return Err(StorageError::NotFound(format!(
1255                "Repository not found: {}",
1256                name
1257            )));
1258        }
1259
1260        let status = response.status();
1261        if !status.is_success() {
1262            let error_text = response.text().await.unwrap_or_default();
1263            return Err(StorageError::ConnectionError(format!(
1264                "Delete repository failed with status {}: {}",
1265                status, error_text
1266            )));
1267        }
1268
1269        Ok(())
1270    }
1271
1272    // ── Branch operations ────────────────────────────────────────────────
1273
1274    /// Create a new branch (workstream) from a source branch.
1275    pub async fn create_branch(
1276        &self,
1277        name: &str,
1278        source_branch: &str,
1279    ) -> Result<String, StorageError> {
1280        if name.is_empty() {
1281            return Err(StorageError::InvalidQuery(
1282                "Branch name cannot be empty".to_string(),
1283            ));
1284        }
1285        if source_branch.is_empty() {
1286            return Err(StorageError::InvalidQuery(
1287                "Source branch cannot be empty".to_string(),
1288            ));
1289        }
1290
1291        let url = format!(
1292            "{}/api/v1/repositories/{}/branches",
1293            self.endpoint, self.repository
1294        );
1295
1296        #[derive(Serialize)]
1297        struct CreateBranchRequest {
1298            name: String,
1299            source: String,
1300        }
1301
1302        let request = CreateBranchRequest {
1303            name: name.to_string(),
1304            source: source_branch.to_string(),
1305        };
1306
1307        let response = self
1308            .send_authed(self.client.post(&url).json(&request))
1309            .await?;
1310
1311        if response.status() == reqwest::StatusCode::CONFLICT {
1312            return Err(StorageError::ConnectionError(format!(
1313                "Branch '{}' already exists",
1314                name
1315            )));
1316        }
1317
1318        let status = response.status();
1319        if !status.is_success() {
1320            let error_text = response.text().await.unwrap_or_default();
1321            return Err(StorageError::ConnectionError(format!(
1322                "Create branch failed with status {}: {}",
1323                status, error_text
1324            )));
1325        }
1326
1327        // LakeFS returns the commit ID the new branch points to as a plain string
1328        let commit_id = response.text().await.map_err(|e| {
1329            StorageError::SerializationError(format!("Failed to read branch response: {}", e))
1330        })?;
1331
1332        // Strip surrounding quotes if present (LakeFS returns JSON string)
1333        let commit_id = commit_id.trim().trim_matches('"').to_string();
1334        Ok(commit_id)
1335    }
1336
1337    /// Delete a branch.
1338    pub async fn delete_branch(&self, name: &str) -> Result<(), StorageError> {
1339        if name.is_empty() {
1340            return Err(StorageError::InvalidQuery(
1341                "Branch name cannot be empty".to_string(),
1342            ));
1343        }
1344
1345        let url = format!(
1346            "{}/api/v1/repositories/{}/branches/{}",
1347            self.endpoint, self.repository, name
1348        );
1349
1350        let response = self.send_authed(self.client.delete(&url)).await?;
1351
1352        if response.status() == reqwest::StatusCode::NOT_FOUND {
1353            return Err(StorageError::NotFound(format!(
1354                "Branch not found: {}",
1355                name
1356            )));
1357        }
1358
1359        let status = response.status();
1360        if !status.is_success() {
1361            let error_text = response.text().await.unwrap_or_default();
1362            return Err(StorageError::ConnectionError(format!(
1363                "Delete branch failed with status {}: {}",
1364                status, error_text
1365            )));
1366        }
1367
1368        Ok(())
1369    }
1370
1371    /// Merge source branch into destination branch.
1372    pub async fn merge_branch(
1373        &self,
1374        source_branch: &str,
1375        destination_branch: &str,
1376        message: Option<&str>,
1377    ) -> Result<MergeResult, StorageError> {
1378        if source_branch.is_empty() || destination_branch.is_empty() {
1379            return Err(StorageError::InvalidQuery(
1380                "Source and destination branches cannot be empty".to_string(),
1381            ));
1382        }
1383
1384        let url = format!(
1385            "{}/api/v1/repositories/{}/refs/{}/merge/{}",
1386            self.endpoint, self.repository, source_branch, destination_branch
1387        );
1388
1389        #[derive(Serialize)]
1390        struct MergeRequest {
1391            message: String,
1392            metadata: HashMap<String, String>,
1393        }
1394
1395        let mut metadata = HashMap::new();
1396        metadata.insert("source".to_string(), "briefcase-ai".to_string());
1397
1398        let request = MergeRequest {
1399            message: message
1400                .unwrap_or(&format!(
1401                    "Merge {} into {}",
1402                    source_branch, destination_branch
1403                ))
1404                .to_string(),
1405            metadata,
1406        };
1407
1408        let response = self
1409            .send_authed(self.client.post(&url).json(&request))
1410            .await?;
1411
1412        if response.status() == reqwest::StatusCode::CONFLICT {
1413            return Err(StorageError::ConnectionError(
1414                "Merge conflict detected".to_string(),
1415            ));
1416        }
1417
1418        let status = response.status();
1419        if !status.is_success() {
1420            let error_text = response.text().await.unwrap_or_default();
1421            return Err(StorageError::ConnectionError(format!(
1422                "Merge failed with status {}: {}",
1423                status, error_text
1424            )));
1425        }
1426
1427        // LakeFS merge returns a reference (commit ID)
1428        #[derive(Deserialize)]
1429        struct MergeResponse {
1430            #[serde(default)]
1431            reference: String,
1432            #[serde(default)]
1433            summary: HashMap<String, serde_json::Value>,
1434        }
1435
1436        let merge_resp: MergeResponse = response.json().await.map_err(|e| {
1437            StorageError::SerializationError(format!("Failed to parse merge response: {}", e))
1438        })?;
1439
1440        Ok(MergeResult {
1441            commit_id: merge_resp.reference,
1442            summary: serde_json::to_string(&merge_resp.summary).unwrap_or_default(),
1443        })
1444    }
1445
1446    // ── Commit operations ────────────────────────────────────────────────
1447
1448    /// Get details of a specific commit.
1449    pub async fn get_commit(&self, commit_id: &str) -> Result<CommitInfo, StorageError> {
1450        if commit_id.is_empty() {
1451            return Err(StorageError::InvalidQuery(
1452                "Commit ID cannot be empty".to_string(),
1453            ));
1454        }
1455
1456        let url = format!(
1457            "{}/api/v1/repositories/{}/commits/{}",
1458            self.endpoint, self.repository, commit_id
1459        );
1460
1461        let response = self.send_authed(self.client.get(&url)).await?;
1462
1463        if response.status() == reqwest::StatusCode::NOT_FOUND {
1464            return Err(StorageError::NotFound(format!(
1465                "Commit not found: {}",
1466                commit_id
1467            )));
1468        }
1469
1470        let status = response.status();
1471        if !status.is_success() {
1472            let error_text = response.text().await.unwrap_or_default();
1473            return Err(StorageError::ConnectionError(format!(
1474                "Get commit failed with status {}: {}",
1475                status, error_text
1476            )));
1477        }
1478
1479        let info: CommitInfo = response.json().await.map_err(|e| {
1480            StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
1481        })?;
1482
1483        Ok(info)
1484    }
1485
1486    // ── Diff operations ──────────────────────────────────────────────────
1487
1488    /// Diff between two LakeFS references (branches, tags, or commit IDs).
1489    pub async fn diff_refs(
1490        &self,
1491        left_ref: &str,
1492        right_ref: &str,
1493    ) -> Result<Vec<DiffEntry>, StorageError> {
1494        if left_ref.is_empty() || right_ref.is_empty() {
1495            return Err(StorageError::InvalidQuery(
1496                "Both refs must be non-empty".to_string(),
1497            ));
1498        }
1499
1500        let url = format!(
1501            "{}/api/v1/repositories/{}/refs/{}/diff/{}",
1502            self.endpoint, self.repository, left_ref, right_ref
1503        );
1504
1505        let response = self.send_authed(self.client.get(&url)).await?;
1506
1507        let status = response.status();
1508        if !status.is_success() {
1509            let error_text = response.text().await.unwrap_or_default();
1510            return Err(StorageError::ConnectionError(format!(
1511                "Diff failed with status {}: {}",
1512                status, error_text
1513            )));
1514        }
1515
1516        #[derive(Deserialize)]
1517        struct DiffResponse {
1518            results: Vec<DiffEntry>,
1519        }
1520
1521        let diff_resp: DiffResponse = response.json().await.map_err(|e| {
1522            StorageError::SerializationError(format!("Failed to parse diff response: {}", e))
1523        })?;
1524
1525        Ok(diff_resp.results)
1526    }
1527
1528    // ── Actions (webhook) operations ─────────────────────────────────────
1529
1530    /// Register a LakeFS Action by uploading its YAML definition to the
1531    /// `_lakefs_actions/` namespace on the current branch, then committing.
1532    pub async fn register_action(&self, action: &LakeFSAction) -> Result<String, StorageError> {
1533        if action.name.is_empty() {
1534            return Err(StorageError::InvalidQuery(
1535                "Action name cannot be empty".to_string(),
1536            ));
1537        }
1538        if action.on.is_empty() {
1539            return Err(StorageError::InvalidQuery(
1540                "Action must have at least one event trigger".to_string(),
1541            ));
1542        }
1543        if action.hooks.is_empty() {
1544            return Err(StorageError::InvalidQuery(
1545                "Action must have at least one hook".to_string(),
1546            ));
1547        }
1548
1549        // Build the YAML content for the action
1550        let yaml_content = self.build_action_yaml(action);
1551
1552        // Upload to _lakefs_actions/ namespace
1553        let path = format!("_lakefs_actions/{}.yaml", action.name);
1554        self.upload_object(&path, yaml_content.as_bytes()).await?;
1555
1556        // Commit the action definition
1557        let commit_msg = format!("Register LakeFS Action: {}", action.name);
1558        let commit_id = self.create_commit(&commit_msg).await?;
1559
1560        Ok(commit_id)
1561    }
1562
1563    /// Build YAML string for a LakeFS Action definition.
1564    fn build_action_yaml(&self, action: &LakeFSAction) -> String {
1565        let mut yaml = String::new();
1566        yaml.push_str(&format!("name: {}\n", action.name));
1567        yaml.push_str("on:\n");
1568        for event in &action.on {
1569            yaml.push_str(&format!("  {}:\n", event));
1570        }
1571        yaml.push_str("hooks:\n");
1572        for hook in &action.hooks {
1573            yaml.push_str(&format!("  - id: {}\n", hook.id));
1574            yaml.push_str(&format!("    type: {}\n", hook.hook_type));
1575            yaml.push_str(&format!("    description: {}\n", hook.description));
1576            if !hook.properties.is_empty() {
1577                yaml.push_str("    properties:\n");
1578                for (k, v) in &hook.properties {
1579                    yaml.push_str(&format!("      {}: {}\n", k, v));
1580                }
1581            }
1582        }
1583        yaml
1584    }
1585
1586    // ── Path helpers ─────────────────────────────────────────────────────
1587
1588    /// Generate object path for snapshot.
1589    fn snapshot_path(&self, snapshot_id: &str) -> String {
1590        format!("snapshots/{}.json", snapshot_id)
1591    }
1592
1593    /// Generate object path for decision.
1594    fn decision_path(&self, decision_id: &str) -> String {
1595        format!("decisions/{}.json", decision_id)
1596    }
1597}
1598
1599// ─── StorageBackend trait impl ───────────────────────────────────────────────
1600
1601#[cfg(all(feature = "async", feature = "networking"))]
1602#[async_trait::async_trait]
1603impl StorageBackend for LakeFSBackend {
1604    async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
1605        let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1606
1607        // Add to pending writes instead of immediate upload
1608        {
1609            let mut pending = self.pending_writes.lock().unwrap();
1610            pending.push(snapshot.clone());
1611        }
1612
1613        Ok(snapshot_id)
1614    }
1615
1616    async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
1617        let decision_id = decision.metadata.snapshot_id.to_string();
1618        let path = self.decision_path(&decision_id);
1619
1620        let json_data = serde_json::to_vec(decision)
1621            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1622
1623        self.upload_object(&path, &json_data).await?;
1624
1625        Ok(decision_id)
1626    }
1627
1628    async fn load(&self, snapshot_id: &str) -> Result<Snapshot, StorageError> {
1629        let path = self.snapshot_path(snapshot_id);
1630        let data = self.download_object(&path).await?;
1631
1632        let snapshot: Snapshot = serde_json::from_slice(&data)
1633            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1634
1635        Ok(snapshot)
1636    }
1637
1638    async fn load_decision(&self, decision_id: &str) -> Result<DecisionSnapshot, StorageError> {
1639        let path = self.decision_path(decision_id);
1640        let data = self.download_object(&path).await?;
1641
1642        let decision: DecisionSnapshot = serde_json::from_slice(&data)
1643            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1644
1645        Ok(decision)
1646    }
1647
1648    async fn query(&self, query: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
1649        // List all snapshots
1650        let paths = self.list_objects("snapshots/").await?;
1651
1652        let mut snapshots = Vec::new();
1653        let mut count = 0;
1654        let offset = query.offset.unwrap_or(0);
1655        let limit = query.limit.unwrap_or(usize::MAX);
1656
1657        for path in paths {
1658            if let Some(filename) = path.split('/').next_back() {
1659                if let Some(snapshot_id) = filename.strip_suffix(".json") {
1660                    // Load snapshot to check filters
1661                    match self.load(snapshot_id).await {
1662                        Ok(snapshot) => {
1663                            // Apply filters
1664                            if self.matches_query(&snapshot, &query) {
1665                                if count >= offset {
1666                                    snapshots.push(snapshot);
1667                                    if snapshots.len() >= limit {
1668                                        break;
1669                                    }
1670                                }
1671                                count += 1;
1672                            }
1673                        }
1674                        Err(_) => continue, // Skip invalid snapshots
1675                    }
1676                }
1677            }
1678        }
1679
1680        // Sort by timestamp (newest first)
1681        snapshots.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
1682
1683        Ok(snapshots)
1684    }
1685
1686    async fn delete(&self, snapshot_id: &str) -> Result<bool, StorageError> {
1687        let path = self.snapshot_path(snapshot_id);
1688        match self.delete_object(&path).await {
1689            Ok(()) => Ok(true),
1690            Err(StorageError::NotFound(_)) => Ok(false),
1691            Err(e) => Err(e),
1692        }
1693    }
1694
1695    async fn flush(&self) -> Result<FlushResult, StorageError> {
1696        let pending_snapshots = {
1697            let mut pending = self.pending_writes.lock().unwrap();
1698            let snapshots = pending.clone();
1699            pending.clear();
1700            snapshots
1701        };
1702
1703        if pending_snapshots.is_empty() {
1704            return Ok(FlushResult {
1705                snapshots_written: 0,
1706                bytes_written: 0,
1707                checkpoint_id: None,
1708            });
1709        }
1710
1711        let mut bytes_written = 0;
1712
1713        // Upload all pending snapshots
1714        for snapshot in &pending_snapshots {
1715            let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1716            let path = self.snapshot_path(&snapshot_id);
1717
1718            let json_data = serde_json::to_vec(snapshot)
1719                .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1720
1721            bytes_written += json_data.len();
1722
1723            self.upload_object(&path, &json_data).await?;
1724
1725            // Also upload individual decisions
1726            for decision in &snapshot.decisions {
1727                let decision_id = decision.metadata.snapshot_id.to_string();
1728                let decision_path = self.decision_path(&decision_id);
1729
1730                let decision_data = serde_json::to_vec(decision)
1731                    .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1732
1733                bytes_written += decision_data.len();
1734                self.upload_object(&decision_path, &decision_data).await?;
1735            }
1736        }
1737
1738        // Create commit
1739        let commit_message = format!("Briefcase AI flush: {} snapshots", pending_snapshots.len());
1740        let commit_id = self.create_commit(&commit_message).await?;
1741
1742        Ok(FlushResult {
1743            snapshots_written: pending_snapshots.len(),
1744            bytes_written,
1745            checkpoint_id: Some(commit_id),
1746        })
1747    }
1748
1749    async fn health_check(&self) -> Result<bool, StorageError> {
1750        let url = format!("{}/api/v1/repositories/{}", self.endpoint, self.repository);
1751
1752        let response = self
1753            .send_authed(self.client.get(&url))
1754            .await
1755            .map_err(|e| StorageError::ConnectionError(format!("Health check failed: {}", e)))?;
1756
1757        Ok(response.status().is_success())
1758    }
1759}
1760
1761#[cfg(all(feature = "async", feature = "networking"))]
1762impl LakeFSBackend {
1763    /// Check if snapshot matches query filters
1764    fn matches_query(&self, snapshot: &Snapshot, query: &SnapshotQuery) -> bool {
1765        // Check time range
1766        if let Some(start_time) = query.start_time {
1767            if snapshot.metadata.timestamp < start_time {
1768                return false;
1769            }
1770        }
1771
1772        if let Some(end_time) = query.end_time {
1773            if snapshot.metadata.timestamp > end_time {
1774                return false;
1775            }
1776        }
1777
1778        // Check function name, module name, model name, tags in decisions
1779        if query.function_name.is_some()
1780            || query.module_name.is_some()
1781            || query.model_name.is_some()
1782            || query.tags.is_some()
1783        {
1784            let mut found_match = false;
1785
1786            for decision in &snapshot.decisions {
1787                let mut decision_matches = true;
1788
1789                if let Some(function_name) = &query.function_name {
1790                    if decision.function_name != *function_name {
1791                        decision_matches = false;
1792                    }
1793                }
1794
1795                if let Some(module_name) = &query.module_name {
1796                    if decision.module_name.as_ref() != Some(module_name) {
1797                        decision_matches = false;
1798                    }
1799                }
1800
1801                if let Some(model_name) = &query.model_name {
1802                    if let Some(model_params) = &decision.model_parameters {
1803                        if model_params.model_name != *model_name {
1804                            decision_matches = false;
1805                        }
1806                    } else {
1807                        decision_matches = false;
1808                    }
1809                }
1810
1811                if let Some(query_tags) = &query.tags {
1812                    for (key, value) in query_tags {
1813                        if decision.tags.get(key) != Some(value) {
1814                            decision_matches = false;
1815                            break;
1816                        }
1817                    }
1818                }
1819
1820                if decision_matches {
1821                    found_match = true;
1822                    break;
1823                }
1824            }
1825
1826            if !found_match {
1827                return false;
1828            }
1829        }
1830
1831        true
1832    }
1833}
1834
1835// ─── Config ──────────────────────────────────────────────────────────────────
1836
1837#[derive(Debug, Clone)]
1838pub struct LakeFSConfig {
1839    pub endpoint: String,
1840    pub repository: String,
1841    pub branch: String,
1842    pub access_key: String,
1843    pub secret_key: String,
1844    /// Optional explicit auth mode.  When `None`, falls back to
1845    /// `LakeFSAuth::Basic { access_key, secret_key }`.
1846    pub auth: Option<LakeFSAuth>,
1847}
1848
1849impl LakeFSConfig {
1850    pub fn new(
1851        endpoint: impl Into<String>,
1852        repository: impl Into<String>,
1853        branch: impl Into<String>,
1854        access_key: impl Into<String>,
1855        secret_key: impl Into<String>,
1856    ) -> Self {
1857        Self {
1858            endpoint: endpoint.into(),
1859            repository: repository.into(),
1860            branch: branch.into(),
1861            access_key: access_key.into(),
1862            secret_key: secret_key.into(),
1863            auth: None,
1864        }
1865    }
1866
1867    /// Set the authentication mode explicitly.
1868    pub fn with_auth(mut self, auth: LakeFSAuth) -> Self {
1869        self.auth = Some(auth);
1870        self
1871    }
1872}
1873
1874// ─── Tests ───────────────────────────────────────────────────────────────────
1875
1876#[cfg(test)]
1877mod tests {
1878    use super::*;
1879    use crate::models::*;
1880    use serde_json::json;
1881    use wiremock::matchers::{method, path, query_param};
1882    use wiremock::{Mock, MockServer, ResponseTemplate};
1883
1884    fn create_test_config_with_endpoint(endpoint: &str) -> LakeFSConfig {
1885        LakeFSConfig::new(
1886            endpoint,
1887            "briefcase-test",
1888            "main",
1889            "test_key",
1890            "test_secret",
1891        )
1892    }
1893
1894    fn create_test_config() -> LakeFSConfig {
1895        LakeFSConfig::new(
1896            "http://localhost:8000",
1897            "briefcase-test",
1898            "main",
1899            "test_access_key",
1900            "test_secret_key",
1901        )
1902    }
1903
1904    async fn create_test_snapshot() -> Snapshot {
1905        let input = Input::new("test_input", json!("value"), "string");
1906        let output = Output::new("test_output", json!("result"), "string");
1907        let model_params = ModelParameters::new("gpt-4");
1908
1909        let decision = DecisionSnapshot::new("test_function")
1910            .with_module("test_module")
1911            .add_input(input)
1912            .add_output(output)
1913            .with_model_parameters(model_params)
1914            .add_tag("env", "test");
1915
1916        let mut snapshot = Snapshot::new(SnapshotType::Session);
1917        snapshot.add_decision(decision);
1918        snapshot
1919    }
1920
1921    // ── Existing unit tests (non-HTTP) ───────────────────────────────────
1922
1923    #[tokio::test]
1924    async fn test_lakefs_config_creation() {
1925        let config = create_test_config();
1926        assert_eq!(config.endpoint, "http://localhost:8000");
1927        assert_eq!(config.repository, "briefcase-test");
1928        assert_eq!(config.branch, "main");
1929    }
1930
1931    #[tokio::test]
1932    async fn test_object_paths() {
1933        let config = create_test_config();
1934        let backend = LakeFSBackend::new(config).unwrap();
1935
1936        let snapshot_id = "test-snapshot-123";
1937        let decision_id = "test-decision-456";
1938
1939        assert_eq!(
1940            backend.snapshot_path(snapshot_id),
1941            "snapshots/test-snapshot-123.json"
1942        );
1943        assert_eq!(
1944            backend.decision_path(decision_id),
1945            "decisions/test-decision-456.json"
1946        );
1947    }
1948
1949    #[tokio::test]
1950    async fn test_query_matching() {
1951        let config = create_test_config();
1952        let backend = LakeFSBackend::new(config).unwrap();
1953        let snapshot = create_test_snapshot().await;
1954
1955        // Test function name matching
1956        let query = SnapshotQuery::new().with_function_name("test_function");
1957        assert!(backend.matches_query(&snapshot, &query));
1958
1959        let query = SnapshotQuery::new().with_function_name("other_function");
1960        assert!(!backend.matches_query(&snapshot, &query));
1961
1962        // Test tag matching
1963        let query = SnapshotQuery::new().with_tag("env", "test");
1964        assert!(backend.matches_query(&snapshot, &query));
1965
1966        let query = SnapshotQuery::new().with_tag("env", "prod");
1967        assert!(!backend.matches_query(&snapshot, &query));
1968
1969        // Test model name matching
1970        let query = SnapshotQuery::new().with_model_name("gpt-4");
1971        assert!(backend.matches_query(&snapshot, &query));
1972
1973        let query = SnapshotQuery::new().with_model_name("claude-3");
1974        assert!(!backend.matches_query(&snapshot, &query));
1975    }
1976
1977    #[tokio::test]
1978    async fn test_pending_writes() {
1979        let config = create_test_config();
1980        let backend = LakeFSBackend::new(config).unwrap();
1981        let snapshot = create_test_snapshot().await;
1982
1983        // Save should add to pending writes
1984        let snapshot_id = backend.save(&snapshot).await.unwrap();
1985        assert_eq!(snapshot_id, snapshot.metadata.snapshot_id.to_string());
1986
1987        // Check pending writes
1988        {
1989            let pending = backend.pending_writes.lock().unwrap();
1990            assert_eq!(pending.len(), 1);
1991        }
1992    }
1993
1994    #[tokio::test]
1995    async fn test_accessors() {
1996        let config = create_test_config();
1997        let backend = LakeFSBackend::new(config).unwrap();
1998
1999        assert_eq!(backend.repository(), "briefcase-test");
2000        assert_eq!(backend.branch(), "main");
2001        assert_eq!(backend.endpoint(), "http://localhost:8000");
2002    }
2003
2004    // ── Edge cases (input validation) ────────────────────────────────────
2005
2006    #[tokio::test]
2007    async fn test_create_repository_empty_name() {
2008        let config = create_test_config();
2009        let backend = LakeFSBackend::new(config).unwrap();
2010
2011        let result = backend.create_repository("", "s3://bucket", None).await;
2012        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2013    }
2014
2015    #[tokio::test]
2016    async fn test_create_repository_empty_namespace() {
2017        let config = create_test_config();
2018        let backend = LakeFSBackend::new(config).unwrap();
2019
2020        let result = backend.create_repository("my-repo", "", None).await;
2021        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2022    }
2023
2024    #[tokio::test]
2025    async fn test_delete_repository_empty_name() {
2026        let config = create_test_config();
2027        let backend = LakeFSBackend::new(config).unwrap();
2028
2029        let result = backend.delete_repository("").await;
2030        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2031    }
2032
2033    #[tokio::test]
2034    async fn test_create_branch_empty_name() {
2035        let config = create_test_config();
2036        let backend = LakeFSBackend::new(config).unwrap();
2037
2038        let result = backend.create_branch("", "main").await;
2039        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2040    }
2041
2042    #[tokio::test]
2043    async fn test_create_branch_empty_source() {
2044        let config = create_test_config();
2045        let backend = LakeFSBackend::new(config).unwrap();
2046
2047        let result = backend.create_branch("feature", "").await;
2048        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2049    }
2050
2051    #[tokio::test]
2052    async fn test_delete_branch_empty_name() {
2053        let config = create_test_config();
2054        let backend = LakeFSBackend::new(config).unwrap();
2055
2056        let result = backend.delete_branch("").await;
2057        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2058    }
2059
2060    #[tokio::test]
2061    async fn test_merge_branch_empty_source() {
2062        let config = create_test_config();
2063        let backend = LakeFSBackend::new(config).unwrap();
2064
2065        let result = backend.merge_branch("", "main", None).await;
2066        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2067    }
2068
2069    #[tokio::test]
2070    async fn test_merge_branch_empty_destination() {
2071        let config = create_test_config();
2072        let backend = LakeFSBackend::new(config).unwrap();
2073
2074        let result = backend.merge_branch("feature", "", None).await;
2075        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2076    }
2077
2078    #[tokio::test]
2079    async fn test_get_commit_empty_id() {
2080        let config = create_test_config();
2081        let backend = LakeFSBackend::new(config).unwrap();
2082
2083        let result = backend.get_commit("").await;
2084        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2085    }
2086
2087    #[tokio::test]
2088    async fn test_diff_refs_empty_left() {
2089        let config = create_test_config();
2090        let backend = LakeFSBackend::new(config).unwrap();
2091
2092        let result = backend.diff_refs("", "main").await;
2093        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2094    }
2095
2096    #[tokio::test]
2097    async fn test_diff_refs_empty_right() {
2098        let config = create_test_config();
2099        let backend = LakeFSBackend::new(config).unwrap();
2100
2101        let result = backend.diff_refs("feature", "").await;
2102        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2103    }
2104
2105    #[tokio::test]
2106    async fn test_delete_object_empty_path() {
2107        let config = create_test_config();
2108        let backend = LakeFSBackend::new(config).unwrap();
2109
2110        let result = backend.delete_object("").await;
2111        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2112    }
2113
2114    #[tokio::test]
2115    async fn test_get_object_stats_empty_path() {
2116        let config = create_test_config();
2117        let backend = LakeFSBackend::new(config).unwrap();
2118
2119        let result = backend.get_object_stats("").await;
2120        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2121    }
2122
2123    #[tokio::test]
2124    async fn test_register_action_empty_name() {
2125        let config = create_test_config();
2126        let backend = LakeFSBackend::new(config).unwrap();
2127
2128        let action = LakeFSAction {
2129            name: "".to_string(),
2130            on: vec!["post-commit".to_string()],
2131            hooks: vec![ActionHook {
2132                id: "hook1".to_string(),
2133                hook_type: "webhook".to_string(),
2134                description: "test".to_string(),
2135                properties: HashMap::new(),
2136            }],
2137        };
2138
2139        let result = backend.register_action(&action).await;
2140        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2141    }
2142
2143    #[tokio::test]
2144    async fn test_register_action_no_events() {
2145        let config = create_test_config();
2146        let backend = LakeFSBackend::new(config).unwrap();
2147
2148        let action = LakeFSAction {
2149            name: "test-action".to_string(),
2150            on: vec![],
2151            hooks: vec![ActionHook {
2152                id: "hook1".to_string(),
2153                hook_type: "webhook".to_string(),
2154                description: "test".to_string(),
2155                properties: HashMap::new(),
2156            }],
2157        };
2158
2159        let result = backend.register_action(&action).await;
2160        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2161    }
2162
2163    #[tokio::test]
2164    async fn test_register_action_no_hooks() {
2165        let config = create_test_config();
2166        let backend = LakeFSBackend::new(config).unwrap();
2167
2168        let action = LakeFSAction {
2169            name: "test-action".to_string(),
2170            on: vec!["post-commit".to_string()],
2171            hooks: vec![],
2172        };
2173
2174        let result = backend.register_action(&action).await;
2175        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2176    }
2177
2178    // ── YAML builder tests ───────────────────────────────────────────────
2179
2180    #[tokio::test]
2181    async fn test_build_action_yaml() {
2182        let config = create_test_config();
2183        let backend = LakeFSBackend::new(config).unwrap();
2184
2185        let mut props = HashMap::new();
2186        props.insert("url".to_string(), "http://localhost:9000/hook".to_string());
2187
2188        let action = LakeFSAction {
2189            name: "briefcase-post-commit".to_string(),
2190            on: vec!["post-commit".to_string()],
2191            hooks: vec![ActionHook {
2192                id: "notify".to_string(),
2193                hook_type: "webhook".to_string(),
2194                description: "Notify Briefcase server".to_string(),
2195                properties: props,
2196            }],
2197        };
2198
2199        let yaml = backend.build_action_yaml(&action);
2200        assert!(yaml.contains("name: briefcase-post-commit"));
2201        assert!(yaml.contains("post-commit:"));
2202        assert!(yaml.contains("id: notify"));
2203        assert!(yaml.contains("type: webhook"));
2204        assert!(yaml.contains("url: http://localhost:9000/hook"));
2205    }
2206
2207    #[tokio::test]
2208    async fn test_build_action_yaml_multiple_events() {
2209        let config = create_test_config();
2210        let backend = LakeFSBackend::new(config).unwrap();
2211
2212        let action = LakeFSAction {
2213            name: "multi-event".to_string(),
2214            on: vec!["pre-commit".to_string(), "post-merge".to_string()],
2215            hooks: vec![ActionHook {
2216                id: "h1".to_string(),
2217                hook_type: "webhook".to_string(),
2218                description: "Hook".to_string(),
2219                properties: HashMap::new(),
2220            }],
2221        };
2222
2223        let yaml = backend.build_action_yaml(&action);
2224        assert!(yaml.contains("pre-commit:"));
2225        assert!(yaml.contains("post-merge:"));
2226    }
2227
2228    // ── Flush edge case: empty pending writes ────────────────────────────
2229
2230    #[tokio::test]
2231    async fn test_flush_empty_pending() {
2232        let config = create_test_config();
2233        let backend = LakeFSBackend::new(config).unwrap();
2234
2235        let result = backend.flush().await.unwrap();
2236        assert_eq!(result.snapshots_written, 0);
2237        assert_eq!(result.bytes_written, 0);
2238        assert!(result.checkpoint_id.is_none());
2239    }
2240
2241    // ── HTTP mock tests (wiremock) ───────────────────────────────────────
2242
2243    #[tokio::test]
2244    async fn test_create_repository_success() {
2245        let mock_server = MockServer::start().await;
2246        let config = create_test_config_with_endpoint(&mock_server.uri());
2247        let backend = LakeFSBackend::new(config).unwrap();
2248
2249        Mock::given(method("POST"))
2250            .and(path("/api/v1/repositories"))
2251            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2252                "id": "my-engagement",
2253                "storage_namespace": "s3://my-bucket/engagement",
2254                "default_branch": "main",
2255                "creation_date": 1700000000
2256            })))
2257            .expect(1)
2258            .mount(&mock_server)
2259            .await;
2260
2261        let result = backend
2262            .create_repository("my-engagement", "s3://my-bucket/engagement", None)
2263            .await
2264            .unwrap();
2265
2266        assert_eq!(result.id, "my-engagement");
2267        assert_eq!(result.storage_namespace, "s3://my-bucket/engagement");
2268        assert_eq!(result.default_branch, "main");
2269    }
2270
2271    #[tokio::test]
2272    async fn test_create_repository_conflict() {
2273        let mock_server = MockServer::start().await;
2274        let config = create_test_config_with_endpoint(&mock_server.uri());
2275        let backend = LakeFSBackend::new(config).unwrap();
2276
2277        Mock::given(method("POST"))
2278            .and(path("/api/v1/repositories"))
2279            .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2280                "message": "repository already exists"
2281            })))
2282            .expect(1)
2283            .mount(&mock_server)
2284            .await;
2285
2286        let result = backend
2287            .create_repository("existing-repo", "s3://bucket", None)
2288            .await;
2289
2290        assert!(result.is_err());
2291        let err = result.unwrap_err();
2292        assert!(
2293            matches!(err, StorageError::ConnectionError(msg) if msg.contains("already exists"))
2294        );
2295    }
2296
2297    #[tokio::test]
2298    async fn test_create_repository_custom_branch() {
2299        let mock_server = MockServer::start().await;
2300        let config = create_test_config_with_endpoint(&mock_server.uri());
2301        let backend = LakeFSBackend::new(config).unwrap();
2302
2303        Mock::given(method("POST"))
2304            .and(path("/api/v1/repositories"))
2305            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2306                "id": "my-repo",
2307                "storage_namespace": "s3://bucket",
2308                "default_branch": "develop",
2309                "creation_date": 1700000000
2310            })))
2311            .expect(1)
2312            .mount(&mock_server)
2313            .await;
2314
2315        let result = backend
2316            .create_repository("my-repo", "s3://bucket", Some("develop"))
2317            .await
2318            .unwrap();
2319
2320        assert_eq!(result.default_branch, "develop");
2321    }
2322
2323    #[tokio::test]
2324    async fn test_delete_repository_success() {
2325        let mock_server = MockServer::start().await;
2326        let config = create_test_config_with_endpoint(&mock_server.uri());
2327        let backend = LakeFSBackend::new(config).unwrap();
2328
2329        Mock::given(method("DELETE"))
2330            .and(path("/api/v1/repositories/test-repo"))
2331            .respond_with(ResponseTemplate::new(204))
2332            .expect(1)
2333            .mount(&mock_server)
2334            .await;
2335
2336        backend.delete_repository("test-repo").await.unwrap();
2337    }
2338
2339    #[tokio::test]
2340    async fn test_delete_repository_not_found() {
2341        let mock_server = MockServer::start().await;
2342        let config = create_test_config_with_endpoint(&mock_server.uri());
2343        let backend = LakeFSBackend::new(config).unwrap();
2344
2345        Mock::given(method("DELETE"))
2346            .and(path("/api/v1/repositories/nonexistent"))
2347            .respond_with(ResponseTemplate::new(404))
2348            .expect(1)
2349            .mount(&mock_server)
2350            .await;
2351
2352        let result = backend.delete_repository("nonexistent").await;
2353        assert!(matches!(result, Err(StorageError::NotFound(_))));
2354    }
2355
2356    #[tokio::test]
2357    async fn test_create_branch_success() {
2358        let mock_server = MockServer::start().await;
2359        let config = create_test_config_with_endpoint(&mock_server.uri());
2360        let backend = LakeFSBackend::new(config).unwrap();
2361
2362        Mock::given(method("POST"))
2363            .and(path("/api/v1/repositories/briefcase-test/branches"))
2364            .respond_with(ResponseTemplate::new(201).set_body_string("\"abc123def456\""))
2365            .expect(1)
2366            .mount(&mock_server)
2367            .await;
2368
2369        let commit_id = backend.create_branch("feature-x", "main").await.unwrap();
2370        assert_eq!(commit_id, "abc123def456");
2371    }
2372
2373    #[tokio::test]
2374    async fn test_create_branch_conflict() {
2375        let mock_server = MockServer::start().await;
2376        let config = create_test_config_with_endpoint(&mock_server.uri());
2377        let backend = LakeFSBackend::new(config).unwrap();
2378
2379        Mock::given(method("POST"))
2380            .and(path("/api/v1/repositories/briefcase-test/branches"))
2381            .respond_with(ResponseTemplate::new(409))
2382            .expect(1)
2383            .mount(&mock_server)
2384            .await;
2385
2386        let result = backend.create_branch("existing-branch", "main").await;
2387        assert!(result.is_err());
2388    }
2389
2390    #[tokio::test]
2391    async fn test_delete_branch_success() {
2392        let mock_server = MockServer::start().await;
2393        let config = create_test_config_with_endpoint(&mock_server.uri());
2394        let backend = LakeFSBackend::new(config).unwrap();
2395
2396        Mock::given(method("DELETE"))
2397            .and(path(
2398                "/api/v1/repositories/briefcase-test/branches/feature-x",
2399            ))
2400            .respond_with(ResponseTemplate::new(204))
2401            .expect(1)
2402            .mount(&mock_server)
2403            .await;
2404
2405        backend.delete_branch("feature-x").await.unwrap();
2406    }
2407
2408    #[tokio::test]
2409    async fn test_delete_branch_not_found() {
2410        let mock_server = MockServer::start().await;
2411        let config = create_test_config_with_endpoint(&mock_server.uri());
2412        let backend = LakeFSBackend::new(config).unwrap();
2413
2414        Mock::given(method("DELETE"))
2415            .and(path(
2416                "/api/v1/repositories/briefcase-test/branches/nonexistent",
2417            ))
2418            .respond_with(ResponseTemplate::new(404))
2419            .expect(1)
2420            .mount(&mock_server)
2421            .await;
2422
2423        let result = backend.delete_branch("nonexistent").await;
2424        assert!(matches!(result, Err(StorageError::NotFound(_))));
2425    }
2426
2427    #[tokio::test]
2428    async fn test_merge_branch_success() {
2429        let mock_server = MockServer::start().await;
2430        let config = create_test_config_with_endpoint(&mock_server.uri());
2431        let backend = LakeFSBackend::new(config).unwrap();
2432
2433        Mock::given(method("POST"))
2434            .and(path(
2435                "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2436            ))
2437            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2438                "reference": "merge-commit-abc123",
2439                "summary": {"added": 3, "removed": 0, "changed": 1}
2440            })))
2441            .expect(1)
2442            .mount(&mock_server)
2443            .await;
2444
2445        let result = backend
2446            .merge_branch("feature-x", "main", Some("Merge feature-x"))
2447            .await
2448            .unwrap();
2449
2450        assert_eq!(result.commit_id, "merge-commit-abc123");
2451        assert!(result.summary.contains("added"));
2452    }
2453
2454    #[tokio::test]
2455    async fn test_merge_branch_conflict() {
2456        let mock_server = MockServer::start().await;
2457        let config = create_test_config_with_endpoint(&mock_server.uri());
2458        let backend = LakeFSBackend::new(config).unwrap();
2459
2460        Mock::given(method("POST"))
2461            .and(path(
2462                "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2463            ))
2464            .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2465                "message": "conflict"
2466            })))
2467            .expect(1)
2468            .mount(&mock_server)
2469            .await;
2470
2471        let result = backend.merge_branch("feature-x", "main", None).await;
2472        assert!(result.is_err());
2473        let err = result.unwrap_err();
2474        assert!(matches!(err, StorageError::ConnectionError(msg) if msg.contains("conflict")));
2475    }
2476
2477    #[tokio::test]
2478    async fn test_merge_branch_default_message() {
2479        let mock_server = MockServer::start().await;
2480        let config = create_test_config_with_endpoint(&mock_server.uri());
2481        let backend = LakeFSBackend::new(config).unwrap();
2482
2483        Mock::given(method("POST"))
2484            .and(path(
2485                "/api/v1/repositories/briefcase-test/refs/src/merge/dst",
2486            ))
2487            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2488                "reference": "commit-xyz",
2489                "summary": {}
2490            })))
2491            .expect(1)
2492            .mount(&mock_server)
2493            .await;
2494
2495        let result = backend.merge_branch("src", "dst", None).await.unwrap();
2496        assert_eq!(result.commit_id, "commit-xyz");
2497    }
2498
2499    #[tokio::test]
2500    async fn test_get_commit_success() {
2501        let mock_server = MockServer::start().await;
2502        let config = create_test_config_with_endpoint(&mock_server.uri());
2503        let backend = LakeFSBackend::new(config).unwrap();
2504
2505        Mock::given(method("GET"))
2506            .and(path("/api/v1/repositories/briefcase-test/commits/abc123"))
2507            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2508                "id": "abc123",
2509                "message": "Initial commit",
2510                "committer": "admin",
2511                "metadata": {"source": "briefcase-ai"},
2512                "creation_date": 1700000000
2513            })))
2514            .expect(1)
2515            .mount(&mock_server)
2516            .await;
2517
2518        let commit = backend.get_commit("abc123").await.unwrap();
2519        assert_eq!(commit.id, "abc123");
2520        assert_eq!(commit.message, "Initial commit");
2521        assert_eq!(commit.committer, "admin");
2522        assert_eq!(
2523            commit.metadata.get("source"),
2524            Some(&"briefcase-ai".to_string())
2525        );
2526    }
2527
2528    #[tokio::test]
2529    async fn test_get_commit_not_found() {
2530        let mock_server = MockServer::start().await;
2531        let config = create_test_config_with_endpoint(&mock_server.uri());
2532        let backend = LakeFSBackend::new(config).unwrap();
2533
2534        Mock::given(method("GET"))
2535            .and(path(
2536                "/api/v1/repositories/briefcase-test/commits/nonexistent",
2537            ))
2538            .respond_with(ResponseTemplate::new(404))
2539            .expect(1)
2540            .mount(&mock_server)
2541            .await;
2542
2543        let result = backend.get_commit("nonexistent").await;
2544        assert!(matches!(result, Err(StorageError::NotFound(_))));
2545    }
2546
2547    #[tokio::test]
2548    async fn test_diff_refs_success() {
2549        let mock_server = MockServer::start().await;
2550        let config = create_test_config_with_endpoint(&mock_server.uri());
2551        let backend = LakeFSBackend::new(config).unwrap();
2552
2553        Mock::given(method("GET"))
2554            .and(path(
2555                "/api/v1/repositories/briefcase-test/refs/main/diff/feature-x",
2556            ))
2557            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2558                "results": [
2559                    {"path": "snapshots/a.json", "type": "added", "size_bytes": 1024},
2560                    {"path": "snapshots/b.json", "type": "changed", "size_bytes": 2048},
2561                    {"path": "decisions/old.json", "type": "removed"}
2562                ]
2563            })))
2564            .expect(1)
2565            .mount(&mock_server)
2566            .await;
2567
2568        let diffs = backend.diff_refs("main", "feature-x").await.unwrap();
2569        assert_eq!(diffs.len(), 3);
2570        assert_eq!(diffs[0].path, "snapshots/a.json");
2571        assert_eq!(diffs[0].diff_type, "added");
2572        assert_eq!(diffs[0].size_bytes, Some(1024));
2573        assert_eq!(diffs[1].diff_type, "changed");
2574        assert_eq!(diffs[2].diff_type, "removed");
2575        assert_eq!(diffs[2].size_bytes, None);
2576    }
2577
2578    #[tokio::test]
2579    async fn test_diff_refs_empty_result() {
2580        let mock_server = MockServer::start().await;
2581        let config = create_test_config_with_endpoint(&mock_server.uri());
2582        let backend = LakeFSBackend::new(config).unwrap();
2583
2584        Mock::given(method("GET"))
2585            .and(path(
2586                "/api/v1/repositories/briefcase-test/refs/main/diff/main",
2587            ))
2588            .respond_with(ResponseTemplate::new(200).set_body_json(json!({"results": []})))
2589            .expect(1)
2590            .mount(&mock_server)
2591            .await;
2592
2593        let diffs = backend.diff_refs("main", "main").await.unwrap();
2594        assert!(diffs.is_empty());
2595    }
2596
2597    #[tokio::test]
2598    async fn test_get_object_stats_success() {
2599        let mock_server = MockServer::start().await;
2600        let config = create_test_config_with_endpoint(&mock_server.uri());
2601        let backend = LakeFSBackend::new(config).unwrap();
2602
2603        Mock::given(method("GET"))
2604            .and(path(
2605                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2606            ))
2607            .and(query_param("path", "snapshots/test.json"))
2608            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2609                "path": "snapshots/test.json",
2610                "physical_address": "s3://bucket/data/abc",
2611                "size_bytes": 4096,
2612                "content_type": "application/json",
2613                "mtime": 1700000000,
2614                "checksum": "sha256:abcdef"
2615            })))
2616            .expect(1)
2617            .mount(&mock_server)
2618            .await;
2619
2620        let stats = backend
2621            .get_object_stats("snapshots/test.json")
2622            .await
2623            .unwrap();
2624
2625        assert_eq!(stats.path, "snapshots/test.json");
2626        assert_eq!(stats.size_bytes, 4096);
2627        assert_eq!(stats.content_type, "application/json");
2628        assert_eq!(stats.checksum, "sha256:abcdef");
2629    }
2630
2631    #[tokio::test]
2632    async fn test_get_object_stats_not_found() {
2633        let mock_server = MockServer::start().await;
2634        let config = create_test_config_with_endpoint(&mock_server.uri());
2635        let backend = LakeFSBackend::new(config).unwrap();
2636
2637        Mock::given(method("GET"))
2638            .and(path(
2639                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2640            ))
2641            .respond_with(ResponseTemplate::new(404))
2642            .expect(1)
2643            .mount(&mock_server)
2644            .await;
2645
2646        let result = backend.get_object_stats("nonexistent.json").await;
2647        assert!(matches!(result, Err(StorageError::NotFound(_))));
2648    }
2649
2650    #[tokio::test]
2651    async fn test_delete_object_success() {
2652        let mock_server = MockServer::start().await;
2653        let config = create_test_config_with_endpoint(&mock_server.uri());
2654        let backend = LakeFSBackend::new(config).unwrap();
2655
2656        Mock::given(method("DELETE"))
2657            .and(path(
2658                "/api/v1/repositories/briefcase-test/branches/main/objects",
2659            ))
2660            .and(query_param("path", "snapshots/old.json"))
2661            .respond_with(ResponseTemplate::new(204))
2662            .expect(1)
2663            .mount(&mock_server)
2664            .await;
2665
2666        backend.delete_object("snapshots/old.json").await.unwrap();
2667    }
2668
2669    #[tokio::test]
2670    async fn test_delete_object_not_found() {
2671        let mock_server = MockServer::start().await;
2672        let config = create_test_config_with_endpoint(&mock_server.uri());
2673        let backend = LakeFSBackend::new(config).unwrap();
2674
2675        Mock::given(method("DELETE"))
2676            .and(path(
2677                "/api/v1/repositories/briefcase-test/branches/main/objects",
2678            ))
2679            .respond_with(ResponseTemplate::new(404))
2680            .expect(1)
2681            .mount(&mock_server)
2682            .await;
2683
2684        let result = backend.delete_object("nonexistent.json").await;
2685        assert!(matches!(result, Err(StorageError::NotFound(_))));
2686    }
2687
2688    #[tokio::test]
2689    async fn test_upload_object_success() {
2690        let mock_server = MockServer::start().await;
2691        let config = create_test_config_with_endpoint(&mock_server.uri());
2692        let backend = LakeFSBackend::new(config).unwrap();
2693
2694        Mock::given(method("PUT"))
2695            .and(path(
2696                "/api/v1/repositories/briefcase-test/branches/main/objects",
2697            ))
2698            .and(query_param("path", "test/file.json"))
2699            .respond_with(ResponseTemplate::new(201))
2700            .expect(1)
2701            .mount(&mock_server)
2702            .await;
2703
2704        backend
2705            .upload_object("test/file.json", b"hello world")
2706            .await
2707            .unwrap();
2708    }
2709
2710    #[tokio::test]
2711    async fn test_upload_object_server_error() {
2712        let mock_server = MockServer::start().await;
2713        let config = create_test_config_with_endpoint(&mock_server.uri());
2714        let backend = LakeFSBackend::new(config).unwrap();
2715
2716        Mock::given(method("PUT"))
2717            .and(path(
2718                "/api/v1/repositories/briefcase-test/branches/main/objects",
2719            ))
2720            .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
2721            .expect(1)
2722            .mount(&mock_server)
2723            .await;
2724
2725        let result = backend.upload_object("test/file.json", b"data").await;
2726        assert!(result.is_err());
2727    }
2728
2729    #[tokio::test]
2730    async fn test_download_object_success() {
2731        let mock_server = MockServer::start().await;
2732        let config = create_test_config_with_endpoint(&mock_server.uri());
2733        let backend = LakeFSBackend::new(config).unwrap();
2734
2735        Mock::given(method("GET"))
2736            .and(path(
2737                "/api/v1/repositories/briefcase-test/refs/main/objects",
2738            ))
2739            .and(query_param("path", "test/file.json"))
2740            .respond_with(ResponseTemplate::new(200).set_body_bytes(b"file content here".to_vec()))
2741            .expect(1)
2742            .mount(&mock_server)
2743            .await;
2744
2745        let data = backend.download_object("test/file.json").await.unwrap();
2746        assert_eq!(data, b"file content here");
2747    }
2748
2749    #[tokio::test]
2750    async fn test_download_object_not_found() {
2751        let mock_server = MockServer::start().await;
2752        let config = create_test_config_with_endpoint(&mock_server.uri());
2753        let backend = LakeFSBackend::new(config).unwrap();
2754
2755        Mock::given(method("GET"))
2756            .and(path(
2757                "/api/v1/repositories/briefcase-test/refs/main/objects",
2758            ))
2759            .respond_with(ResponseTemplate::new(404))
2760            .expect(1)
2761            .mount(&mock_server)
2762            .await;
2763
2764        let result = backend.download_object("missing.json").await;
2765        assert!(matches!(result, Err(StorageError::NotFound(_))));
2766    }
2767
2768    #[tokio::test]
2769    async fn test_create_commit_success() {
2770        let mock_server = MockServer::start().await;
2771        let config = create_test_config_with_endpoint(&mock_server.uri());
2772        let backend = LakeFSBackend::new(config).unwrap();
2773
2774        Mock::given(method("POST"))
2775            .and(path(
2776                "/api/v1/repositories/briefcase-test/branches/main/commits",
2777            ))
2778            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2779                "id": "commit-sha-xyz"
2780            })))
2781            .expect(1)
2782            .mount(&mock_server)
2783            .await;
2784
2785        let id = backend.create_commit("Test commit").await.unwrap();
2786        assert_eq!(id, "commit-sha-xyz");
2787    }
2788
2789    #[tokio::test]
2790    async fn test_list_objects_success() {
2791        let mock_server = MockServer::start().await;
2792        let config = create_test_config_with_endpoint(&mock_server.uri());
2793        let backend = LakeFSBackend::new(config).unwrap();
2794
2795        Mock::given(method("GET"))
2796            .and(path(
2797                "/api/v1/repositories/briefcase-test/refs/main/objects/ls",
2798            ))
2799            .and(query_param("prefix", "snapshots/"))
2800            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2801                "results": [
2802                    {"path": "snapshots/a.json", "type": "object"},
2803                    {"path": "snapshots/b.json", "type": "object"},
2804                    {"path": "snapshots/", "type": "common_prefix"}
2805                ]
2806            })))
2807            .expect(1)
2808            .mount(&mock_server)
2809            .await;
2810
2811        let paths = backend.list_objects("snapshots/").await.unwrap();
2812        assert_eq!(paths.len(), 2);
2813        assert!(paths.contains(&"snapshots/a.json".to_string()));
2814        assert!(paths.contains(&"snapshots/b.json".to_string()));
2815    }
2816
2817    #[tokio::test]
2818    async fn test_health_check_healthy() {
2819        let mock_server = MockServer::start().await;
2820        let config = create_test_config_with_endpoint(&mock_server.uri());
2821        let backend = LakeFSBackend::new(config).unwrap();
2822
2823        Mock::given(method("GET"))
2824            .and(path("/api/v1/repositories/briefcase-test"))
2825            .respond_with(ResponseTemplate::new(200))
2826            .expect(1)
2827            .mount(&mock_server)
2828            .await;
2829
2830        assert!(backend.health_check().await.unwrap());
2831    }
2832
2833    #[tokio::test]
2834    async fn test_health_check_unhealthy() {
2835        let mock_server = MockServer::start().await;
2836        let config = create_test_config_with_endpoint(&mock_server.uri());
2837        let backend = LakeFSBackend::new(config).unwrap();
2838
2839        Mock::given(method("GET"))
2840            .and(path("/api/v1/repositories/briefcase-test"))
2841            .respond_with(ResponseTemplate::new(503))
2842            .expect(1)
2843            .mount(&mock_server)
2844            .await;
2845
2846        assert!(!backend.health_check().await.unwrap());
2847    }
2848
2849    #[tokio::test]
2850    async fn test_delete_via_trait_success() {
2851        let mock_server = MockServer::start().await;
2852        let config = create_test_config_with_endpoint(&mock_server.uri());
2853        let backend = LakeFSBackend::new(config).unwrap();
2854
2855        Mock::given(method("DELETE"))
2856            .and(path(
2857                "/api/v1/repositories/briefcase-test/branches/main/objects",
2858            ))
2859            .respond_with(ResponseTemplate::new(204))
2860            .expect(1)
2861            .mount(&mock_server)
2862            .await;
2863
2864        let deleted = backend.delete("some-snapshot-id").await.unwrap();
2865        assert!(deleted);
2866    }
2867
2868    #[tokio::test]
2869    async fn test_delete_via_trait_not_found() {
2870        let mock_server = MockServer::start().await;
2871        let config = create_test_config_with_endpoint(&mock_server.uri());
2872        let backend = LakeFSBackend::new(config).unwrap();
2873
2874        Mock::given(method("DELETE"))
2875            .and(path(
2876                "/api/v1/repositories/briefcase-test/branches/main/objects",
2877            ))
2878            .respond_with(ResponseTemplate::new(404))
2879            .expect(1)
2880            .mount(&mock_server)
2881            .await;
2882
2883        let deleted = backend.delete("nonexistent").await.unwrap();
2884        assert!(!deleted);
2885    }
2886
2887    // Note: Integration tests against real LakeFS would require a running instance
2888    // These would be better suited for a separate integration test suite
2889
2890    // ─── Auth provider tests ────────────────────────────────────────────────
2891
2892    #[test]
2893    fn test_lakefs_auth_enum_basic() {
2894        let auth = LakeFSAuth::Basic {
2895            access_key: "ak".into(),
2896            secret_key: "sk".into(),
2897        };
2898        match auth {
2899            LakeFSAuth::Basic {
2900                access_key,
2901                secret_key,
2902            } => {
2903                assert_eq!(access_key, "ak");
2904                assert_eq!(secret_key, "sk");
2905            }
2906            _ => panic!("Expected Basic"),
2907        }
2908    }
2909
2910    #[test]
2911    fn test_lakefs_auth_enum_all_variants() {
2912        // Ensure all 6 variants construct without panic.
2913        let _basic = LakeFSAuth::Basic {
2914            access_key: "a".into(),
2915            secret_key: "s".into(),
2916        };
2917        let _token = LakeFSAuth::Token {
2918            access_key: "a".into(),
2919            secret_key: "s".into(),
2920        };
2921        let _sts = LakeFSAuth::Sts {
2922            oidc_token: "tok".into(),
2923        };
2924        let _iam = LakeFSAuth::Iam;
2925        let _oidc = LakeFSAuth::Oidc {
2926            token: "tok".into(),
2927        };
2928        let _saml = LakeFSAuth::Saml {
2929            assertion: "xml".into(),
2930        };
2931    }
2932
2933    #[test]
2934    fn test_lakefs_config_with_auth() {
2935        let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk").with_auth(
2936            LakeFSAuth::Token {
2937                access_key: "ak2".into(),
2938                secret_key: "sk2".into(),
2939            },
2940        );
2941        assert!(config.auth.is_some());
2942        match config.auth.unwrap() {
2943            LakeFSAuth::Token {
2944                access_key,
2945                secret_key,
2946            } => {
2947                assert_eq!(access_key, "ak2");
2948                assert_eq!(secret_key, "sk2");
2949            }
2950            _ => panic!("Expected Token"),
2951        }
2952    }
2953
2954    #[test]
2955    fn test_lakefs_config_without_auth_defaults_to_none() {
2956        let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk");
2957        assert!(config.auth.is_none());
2958    }
2959
2960    #[tokio::test]
2961    async fn test_basic_auth_provider_header() {
2962        let provider = BasicAuthProvider::new(
2963            "AKIAIOSFODNN7EXAMPLE",
2964            "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
2965        )
2966        .unwrap();
2967        let header = provider.auth_header().await.unwrap();
2968        let header_str = header.to_str().unwrap();
2969        assert!(header_str.starts_with("Basic "));
2970        // Verify it's valid base64
2971        let b64_part = &header_str["Basic ".len()..];
2972        let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
2973        let decoded_str = String::from_utf8(decoded).unwrap();
2974        assert_eq!(
2975            decoded_str,
2976            "AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
2977        );
2978    }
2979
2980    #[tokio::test]
2981    async fn test_basic_auth_provider_mode_name() {
2982        let provider = BasicAuthProvider::new("ak", "sk").unwrap();
2983        assert_eq!(provider.mode_name(), "basic");
2984    }
2985
2986    #[tokio::test]
2987    async fn test_token_auth_provider_login_success() {
2988        let mock_server = MockServer::start().await;
2989
2990        // Mock the login endpoint
2991        Mock::given(method("POST"))
2992            .and(path("/api/v1/auth/login"))
2993            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2994                "token": "eyJhbGciOiJIUzI1NiJ9.test",
2995                "token_expiration": 9999999999u64
2996            })))
2997            .expect(1)
2998            .mount(&mock_server)
2999            .await;
3000
3001        let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3002        let header = provider.auth_header().await.unwrap();
3003        let header_str = header.to_str().unwrap();
3004        assert_eq!(header_str, "Bearer eyJhbGciOiJIUzI1NiJ9.test");
3005        assert_eq!(provider.mode_name(), "token");
3006    }
3007
3008    #[tokio::test]
3009    async fn test_token_auth_provider_caches_token() {
3010        let mock_server = MockServer::start().await;
3011
3012        Mock::given(method("POST"))
3013            .and(path("/api/v1/auth/login"))
3014            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3015                "token": "cached-token",
3016                "token_expiration": 9999999999u64
3017            })))
3018            .expect(1)  // Should only be called once
3019            .mount(&mock_server)
3020            .await;
3021
3022        let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3023
3024        // First call — fetches token
3025        let h1 = provider.auth_header().await.unwrap();
3026        // Second call — cache hit
3027        let h2 = provider.auth_header().await.unwrap();
3028        assert_eq!(h1, h2);
3029    }
3030
3031    #[tokio::test]
3032    async fn test_token_auth_provider_login_failure() {
3033        let mock_server = MockServer::start().await;
3034
3035        Mock::given(method("POST"))
3036            .and(path("/api/v1/auth/login"))
3037            .respond_with(ResponseTemplate::new(401).set_body_json(json!({
3038                "message": "invalid credentials"
3039            })))
3040            .expect(1)
3041            .mount(&mock_server)
3042            .await;
3043
3044        let provider = TokenAuthProvider::new(&mock_server.uri(), "bad", "creds");
3045        let result = provider.auth_header().await;
3046        assert!(result.is_err());
3047        let err_msg = format!("{}", result.unwrap_err());
3048        assert!(err_msg.contains("401"));
3049    }
3050
3051    #[tokio::test]
3052    async fn test_sts_auth_provider_success() {
3053        let mock_server = MockServer::start().await;
3054
3055        Mock::given(method("POST"))
3056            .and(path("/sts/login"))
3057            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3058                "Credentials": {
3059                    "AccessKeyId": "ASIATEMP",
3060                    "SecretAccessKey": "tempSecret",
3061                    "SessionToken": "FwoGZXIvY...",
3062                    "Expiration": 9999999999u64
3063                }
3064            })))
3065            .expect(1)
3066            .mount(&mock_server)
3067            .await;
3068
3069        let provider = StsAuthProvider::new(&mock_server.uri(), "my-oidc-token");
3070        let header = provider.auth_header().await.unwrap();
3071        let header_str = header.to_str().unwrap();
3072        assert!(header_str.starts_with("Basic "));
3073        // Decode and verify it uses the temp credentials
3074        let b64_part = &header_str["Basic ".len()..];
3075        let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3076        let decoded_str = String::from_utf8(decoded).unwrap();
3077        assert_eq!(decoded_str, "ASIATEMP:tempSecret");
3078        assert_eq!(provider.mode_name(), "sts");
3079    }
3080
3081    #[tokio::test]
3082    async fn test_oidc_auth_provider_success() {
3083        let mock_server = MockServer::start().await;
3084
3085        Mock::given(method("POST"))
3086            .and(path("/api/v1/oidc/login"))
3087            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3088                "token": "oidc-lakefs-jwt-token",
3089                "token_expiration": 9999999999u64
3090            })))
3091            .expect(1)
3092            .mount(&mock_server)
3093            .await;
3094
3095        let provider = OidcAuthProvider::new(&mock_server.uri(), "external-oidc-token");
3096        let header = provider.auth_header().await.unwrap();
3097        let header_str = header.to_str().unwrap();
3098        assert_eq!(header_str, "Bearer oidc-lakefs-jwt-token");
3099        assert_eq!(provider.mode_name(), "oidc");
3100    }
3101
3102    #[tokio::test]
3103    async fn test_saml_auth_provider_success() {
3104        let mock_server = MockServer::start().await;
3105
3106        Mock::given(method("POST"))
3107            .and(path("/api/v1/auth/external/saml"))
3108            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3109                "token": "saml-lakefs-jwt-token",
3110                "token_expiration": 9999999999u64
3111            })))
3112            .expect(1)
3113            .mount(&mock_server)
3114            .await;
3115
3116        let provider = SamlAuthProvider::new(&mock_server.uri(), "PHNhbWxSZXNwb25zZT4=");
3117        let header = provider.auth_header().await.unwrap();
3118        let header_str = header.to_str().unwrap();
3119        assert_eq!(header_str, "Bearer saml-lakefs-jwt-token");
3120        assert_eq!(provider.mode_name(), "saml");
3121    }
3122
3123    #[tokio::test]
3124    async fn test_iam_auth_provider_mode_name() {
3125        let provider = IamAuthProvider::new();
3126        assert_eq!(provider.mode_name(), "iam");
3127    }
3128
3129    #[tokio::test]
3130    async fn test_build_auth_provider_basic() {
3131        let auth = LakeFSAuth::Basic {
3132            access_key: "ak".into(),
3133            secret_key: "sk".into(),
3134        };
3135        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3136        assert_eq!(provider.mode_name(), "basic");
3137        let header = provider.auth_header().await.unwrap();
3138        assert!(header.to_str().unwrap().starts_with("Basic "));
3139    }
3140
3141    #[tokio::test]
3142    async fn test_build_auth_provider_token() {
3143        let auth = LakeFSAuth::Token {
3144            access_key: "ak".into(),
3145            secret_key: "sk".into(),
3146        };
3147        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3148        assert_eq!(provider.mode_name(), "token");
3149    }
3150
3151    #[tokio::test]
3152    async fn test_build_auth_provider_iam() {
3153        let auth = LakeFSAuth::Iam;
3154        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3155        assert_eq!(provider.mode_name(), "iam");
3156    }
3157
3158    #[tokio::test]
3159    async fn test_backend_with_basic_auth_config() {
3160        let mock_server = MockServer::start().await;
3161
3162        let config = LakeFSConfig::new(
3163            mock_server.uri(),
3164            "test-repo",
3165            "main",
3166            "test_key",
3167            "test_secret",
3168        )
3169        .with_auth(LakeFSAuth::Basic {
3170            access_key: "test_key".into(),
3171            secret_key: "test_secret".into(),
3172        });
3173
3174        let backend = LakeFSBackend::new(config).unwrap();
3175        assert_eq!(backend.auth_mode(), "basic");
3176    }
3177
3178    #[tokio::test]
3179    async fn test_backend_with_token_auth_sends_bearer() {
3180        let mock_server = MockServer::start().await;
3181
3182        // Mock login endpoint
3183        Mock::given(method("POST"))
3184            .and(path("/api/v1/auth/login"))
3185            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3186                "token": "test-jwt-token",
3187                "token_expiration": 9999999999u64
3188            })))
3189            .mount(&mock_server)
3190            .await;
3191
3192        // Mock repository endpoint (health check target)
3193        Mock::given(method("GET"))
3194            .and(path("/api/v1/repositories/test-repo"))
3195            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3196                "id": "test-repo",
3197                "storage_namespace": "s3://test",
3198                "default_branch": "main"
3199            })))
3200            .mount(&mock_server)
3201            .await;
3202
3203        let config = LakeFSConfig::new(mock_server.uri(), "test-repo", "main", "ak", "sk")
3204            .with_auth(LakeFSAuth::Token {
3205                access_key: "ak".into(),
3206                secret_key: "sk".into(),
3207            });
3208
3209        let backend = LakeFSBackend::new(config).unwrap();
3210        assert_eq!(backend.auth_mode(), "token");
3211
3212        // Verify it can make an authenticated request
3213        let health = backend.health_check().await.unwrap();
3214        assert!(health);
3215    }
3216
3217    #[tokio::test]
3218    async fn test_backend_default_auth_is_basic() {
3219        let config = LakeFSConfig::new("http://localhost:8000", "test-repo", "main", "ak", "sk");
3220        // No .with_auth() → should default to Basic
3221        let backend = LakeFSBackend::new(config).unwrap();
3222        assert_eq!(backend.auth_mode(), "basic");
3223    }
3224}