Skip to main content

briefcase_core/storage/
lakefs.rs

1use super::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
2use crate::models::{DecisionSnapshot, Snapshot};
3#[cfg(feature = "networking")]
4use base64::{engine::general_purpose, Engine as _};
5#[cfg(feature = "networking")]
6use reqwest::{
7    header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
8    Client, RequestBuilder, Response,
9};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14// ─── Authentication types ────────────────────────────────────────────────────
15
16/// Supported lakeFS authentication modes.
17///
18/// lakeFS supports multiple authentication mechanisms.  `LakeFSAuth`
19/// captures the configuration needed for each.  Pass one of these to
20/// [`LakeFSConfig::with_auth`] to select the desired mode.
21///
22/// When no explicit auth is configured (`None`), the backend falls back
23/// to Basic auth using `access_key` / `secret_key` from the config.
24#[derive(Debug, Clone)]
25pub enum LakeFSAuth {
26    /// HTTP Basic authentication (access_key:secret_key).
27    /// This is the lakeFS default and the backward-compatible mode.
28    Basic {
29        access_key: String,
30        secret_key: String,
31    },
32
33    /// JWT token authentication.
34    /// The client POSTs to `/api/v1/auth/login` with access_key/secret_key
35    /// and receives a JWT.  The JWT is refreshed before expiry.
36    Token {
37        access_key: String,
38        secret_key: String,
39    },
40
41    /// STS (Security Token Service) federation.
42    /// The client POSTs an external OIDC token to `/sts/login` and receives
43    /// temporary lakeFS credentials (access_key + secret_key + session_token).
44    Sts {
45        /// The external OIDC identity token to exchange.
46        oidc_token: String,
47    },
48
49    /// IAM-based authentication (AWS instance metadata / ECS task role).
50    /// Credentials are fetched from the EC2 instance metadata service or
51    /// ECS container credentials endpoint at `169.254.169.254` / `169.254.170.2`.
52    Iam,
53
54    /// External OIDC provider authentication.
55    /// The client sends an external OIDC token to `/api/v1/oidc/login` and
56    /// receives a lakeFS JWT in return.
57    Oidc {
58        /// The external OIDC identity token.
59        token: String,
60    },
61
62    /// SAML SSO authentication.
63    /// The client POSTs a base64-encoded SAML assertion to
64    /// `/api/v1/auth/external/saml` and receives a lakeFS JWT.
65    Saml {
66        /// Base64-encoded SAML assertion XML.
67        assertion: String,
68    },
69}
70
71// ─── AuthProvider (internal) ─────────────────────────────────────────────────
72
73/// Internal trait for dynamic authentication header management.
74///
75/// Each auth mode implements this trait.  The `LakeFSBackend` calls
76/// `auth_header()` before every HTTP request, allowing token-based modes
77/// to transparently refresh expired credentials.
78#[cfg(all(feature = "async", feature = "networking"))]
79#[async_trait::async_trait]
80pub(crate) trait AuthProvider: Send + Sync {
81    /// Return the current `Authorization` header value.
82    ///
83    /// Implementations that use expiring tokens should check for expiry
84    /// inside this method and refresh transparently.
85    async fn auth_header(&self) -> Result<HeaderValue, StorageError>;
86
87    /// Human-readable name for logging.
88    fn mode_name(&self) -> &'static str;
89}
90
91// ── BasicAuthProvider ────────────────────────────────────────────────────────
92
93#[cfg(all(feature = "async", feature = "networking"))]
94pub(crate) struct BasicAuthProvider {
95    header: HeaderValue,
96}
97
98#[cfg(all(feature = "async", feature = "networking"))]
99impl BasicAuthProvider {
100    pub fn new(access_key: &str, secret_key: &str) -> Result<Self, StorageError> {
101        let credentials = format!("{}:{}", access_key, secret_key);
102        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
103        let header_str = format!("Basic {}", encoded);
104        let header = HeaderValue::from_str(&header_str).map_err(|e| {
105            StorageError::ConnectionError(format!("Invalid Basic auth header: {}", e))
106        })?;
107        Ok(Self { header })
108    }
109}
110
111#[cfg(all(feature = "async", feature = "networking"))]
112#[async_trait::async_trait]
113impl AuthProvider for BasicAuthProvider {
114    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
115        Ok(self.header.clone())
116    }
117    fn mode_name(&self) -> &'static str {
118        "basic"
119    }
120}
121
122// ── TokenAuthProvider (JWT via /api/v1/auth/login) ───────────────────────────
123
124#[cfg(all(feature = "async", feature = "networking"))]
125pub(crate) struct TokenAuthProvider {
126    endpoint: String,
127    access_key: String,
128    secret_key: String,
129    client: Client,
130    /// Cached JWT + expiry (seconds since UNIX epoch).
131    cache: Mutex<Option<(String, u64)>>,
132}
133
134#[cfg(all(feature = "async", feature = "networking"))]
135impl TokenAuthProvider {
136    pub fn new(endpoint: &str, access_key: &str, secret_key: &str) -> Self {
137        let client = Client::builder()
138            .timeout(std::time::Duration::from_secs(10))
139            .build()
140            .unwrap_or_default();
141        Self {
142            endpoint: endpoint.trim_end_matches('/').to_string(),
143            access_key: access_key.to_string(),
144            secret_key: secret_key.to_string(),
145            client,
146            cache: Mutex::new(None),
147        }
148    }
149
150    /// Fetch (or refresh) the JWT.
151    async fn fetch_token(&self) -> Result<(String, u64), StorageError> {
152        #[derive(Serialize)]
153        struct LoginRequest {
154            access_key_id: String,
155            secret_access_key: String,
156        }
157
158        #[derive(Deserialize)]
159        struct LoginResponse {
160            token: String,
161            #[serde(default)]
162            token_expiration: Option<u64>,
163        }
164
165        let url = format!("{}/api/v1/auth/login", self.endpoint);
166        let body = LoginRequest {
167            access_key_id: self.access_key.clone(),
168            secret_access_key: self.secret_key.clone(),
169        };
170
171        let resp = self
172            .client
173            .post(&url)
174            .json(&body)
175            .send()
176            .await
177            .map_err(|e| StorageError::ConnectionError(format!("Token login failed: {}", e)))?;
178
179        if !resp.status().is_success() {
180            let status = resp.status();
181            let text = resp.text().await.unwrap_or_default();
182            return Err(StorageError::ConnectionError(format!(
183                "Token login returned {}: {}",
184                status, text
185            )));
186        }
187
188        let login: LoginResponse = resp.json().await.map_err(|e| {
189            StorageError::ConnectionError(format!("Failed to parse login response: {}", e))
190        })?;
191
192        // Default expiry: 1 hour from now
193        let now_secs = std::time::SystemTime::now()
194            .duration_since(std::time::UNIX_EPOCH)
195            .unwrap_or_default()
196            .as_secs();
197        let expiry = login.token_expiration.unwrap_or(now_secs + 3600);
198
199        Ok((login.token, expiry))
200    }
201}
202
203#[cfg(all(feature = "async", feature = "networking"))]
204#[async_trait::async_trait]
205impl AuthProvider for TokenAuthProvider {
206    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
207        // Check cache — refresh if expired or within 60s of expiry.
208        let needs_refresh = {
209            let cache = self.cache.lock().unwrap();
210            match &*cache {
211                Some((_token, expiry)) => {
212                    let now = std::time::SystemTime::now()
213                        .duration_since(std::time::UNIX_EPOCH)
214                        .unwrap_or_default()
215                        .as_secs();
216                    now + 60 >= *expiry
217                }
218                None => true,
219            }
220        };
221
222        if needs_refresh {
223            let (token, expiry) = self.fetch_token().await?;
224            let mut cache = self.cache.lock().unwrap();
225            *cache = Some((token, expiry));
226        }
227
228        let cache = self.cache.lock().unwrap();
229        let (token, _) = cache.as_ref().unwrap();
230        let header_str = format!("Bearer {}", token);
231        HeaderValue::from_str(&header_str)
232            .map_err(|e| StorageError::ConnectionError(format!("Invalid Bearer header: {}", e)))
233    }
234
235    fn mode_name(&self) -> &'static str {
236        "token"
237    }
238}
239
240// ── StsAuthProvider (POST /sts/login with OIDC token) ────────────────────────
241
242#[cfg(all(feature = "async", feature = "networking"))]
243pub(crate) struct StsAuthProvider {
244    endpoint: String,
245    oidc_token: String,
246    client: Client,
247    /// Cached temp credentials: (access_key, secret_key, session_token, expiry_secs).
248    cache: Mutex<Option<(String, String, String, u64)>>,
249}
250
251#[cfg(all(feature = "async", feature = "networking"))]
252impl StsAuthProvider {
253    pub fn new(endpoint: &str, oidc_token: &str) -> Self {
254        let client = Client::builder()
255            .timeout(std::time::Duration::from_secs(10))
256            .build()
257            .unwrap_or_default();
258        Self {
259            endpoint: endpoint.trim_end_matches('/').to_string(),
260            oidc_token: oidc_token.to_string(),
261            client,
262            cache: Mutex::new(None),
263        }
264    }
265
266    async fn fetch_temp_credentials(&self) -> Result<(String, String, String, u64), StorageError> {
267        #[derive(Deserialize)]
268        struct StsResponse {
269            #[serde(rename = "Credentials")]
270            credentials: StsCredentials,
271        }
272
273        #[derive(Deserialize)]
274        struct StsCredentials {
275            #[serde(rename = "AccessKeyId")]
276            access_key_id: String,
277            #[serde(rename = "SecretAccessKey")]
278            secret_access_key: String,
279            #[serde(rename = "SessionToken")]
280            session_token: String,
281            #[serde(rename = "Expiration", default)]
282            expiration: Option<u64>,
283        }
284
285        let url = format!("{}/sts/login", self.endpoint);
286
287        let resp = self
288            .client
289            .post(&url)
290            .header("Content-Type", "application/x-www-form-urlencoded")
291            .body(format!(
292                "Action=AssumeRoleWithWebIdentity&WebIdentityToken={}&Version=2011-06-15",
293                self.oidc_token
294            ))
295            .send()
296            .await
297            .map_err(|e| StorageError::ConnectionError(format!("STS login failed: {}", e)))?;
298
299        if !resp.status().is_success() {
300            let status = resp.status();
301            let text = resp.text().await.unwrap_or_default();
302            return Err(StorageError::ConnectionError(format!(
303                "STS login returned {}: {}",
304                status, text
305            )));
306        }
307
308        let sts: StsResponse = resp.json().await.map_err(|e| {
309            StorageError::ConnectionError(format!("Failed to parse STS response: {}", e))
310        })?;
311
312        let now_secs = std::time::SystemTime::now()
313            .duration_since(std::time::UNIX_EPOCH)
314            .unwrap_or_default()
315            .as_secs();
316        let expiry = sts.credentials.expiration.unwrap_or(now_secs + 3600);
317
318        Ok((
319            sts.credentials.access_key_id,
320            sts.credentials.secret_access_key,
321            sts.credentials.session_token,
322            expiry,
323        ))
324    }
325}
326
327#[cfg(all(feature = "async", feature = "networking"))]
328#[async_trait::async_trait]
329impl AuthProvider for StsAuthProvider {
330    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
331        let needs_refresh = {
332            let cache = self.cache.lock().unwrap();
333            match &*cache {
334                Some((_, _, _, expiry)) => {
335                    let now = std::time::SystemTime::now()
336                        .duration_since(std::time::UNIX_EPOCH)
337                        .unwrap_or_default()
338                        .as_secs();
339                    now + 60 >= *expiry
340                }
341                None => true,
342            }
343        };
344
345        if needs_refresh {
346            let creds = self.fetch_temp_credentials().await?;
347            let mut cache = self.cache.lock().unwrap();
348            *cache = Some(creds);
349        }
350
351        // STS temp credentials use Basic auth with the temp access_key:secret_key.
352        // The session token is sent as `X-Lakefs-Session-Token` but since we only
353        // control the Authorization header here, we encode access_key:secret_key.
354        let cache = self.cache.lock().unwrap();
355        let (ak, sk, _session_token, _) = cache.as_ref().unwrap();
356        let credentials = format!("{}:{}", ak, sk);
357        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
358        let header_str = format!("Basic {}", encoded);
359        HeaderValue::from_str(&header_str)
360            .map_err(|e| StorageError::ConnectionError(format!("Invalid STS auth header: {}", e)))
361    }
362
363    fn mode_name(&self) -> &'static str {
364        "sts"
365    }
366}
367
368// ── IamAuthProvider (AWS instance metadata / ECS task role) ──────────────────
369
370#[cfg(all(feature = "async", feature = "networking"))]
371pub(crate) struct IamAuthProvider {
372    client: Client,
373    /// Cached credentials: (access_key, secret_key, token, expiry_secs).
374    cache: Mutex<Option<IamCredentialCache>>,
375}
376
377#[cfg(all(feature = "async", feature = "networking"))]
378type IamCredentialCache = (String, String, Option<String>, u64);
379
380#[cfg(all(feature = "async", feature = "networking"))]
381impl IamAuthProvider {
382    pub fn new() -> Self {
383        let client = Client::builder()
384            .timeout(std::time::Duration::from_secs(5))
385            .build()
386            .unwrap_or_default();
387        Self {
388            client,
389            cache: Mutex::new(None),
390        }
391    }
392
393    /// Try ECS container credentials first, then fall back to EC2 instance metadata.
394    async fn fetch_credentials(&self) -> Result<IamCredentialCache, StorageError> {
395        // 1. Try ECS container credentials endpoint
396        if let Ok(creds_uri) = std::env::var("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") {
397            let url = format!("http://169.254.170.2{}", creds_uri);
398            if let Ok(resp) = self.client.get(&url).send().await {
399                if resp.status().is_success() {
400                    if let Ok(creds) = resp.json::<IamCredentials>().await {
401                        let now_secs = std::time::SystemTime::now()
402                            .duration_since(std::time::UNIX_EPOCH)
403                            .unwrap_or_default()
404                            .as_secs();
405                        return Ok((
406                            creds.access_key_id,
407                            creds.secret_access_key,
408                            creds.token,
409                            now_secs + 3600,
410                        ));
411                    }
412                }
413            }
414        }
415
416        // 2. Fall back to EC2 instance metadata (IMDSv2)
417        let token_resp = self
418            .client
419            .put("http://169.254.169.254/latest/api/token")
420            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421            .send()
422            .await
423            .map_err(|e| {
424                StorageError::ConnectionError(format!("IAM: Failed to fetch IMDS token: {}", e))
425            })?;
426
427        let imds_token = token_resp.text().await.map_err(|e| {
428            StorageError::ConnectionError(format!("IAM: Failed to read IMDS token: {}", e))
429        })?;
430
431        // Get role name
432        let role_resp = self
433            .client
434            .get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")
435            .header("X-aws-ec2-metadata-token", &imds_token)
436            .send()
437            .await
438            .map_err(|e| {
439                StorageError::ConnectionError(format!("IAM: Failed to fetch role name: {}", e))
440            })?;
441
442        let role_name = role_resp.text().await.map_err(|e| {
443            StorageError::ConnectionError(format!("IAM: Failed to read role name: {}", e))
444        })?;
445        let role_name = role_name.trim();
446
447        // Get credentials for role
448        let creds_url = format!(
449            "http://169.254.169.254/latest/meta-data/iam/security-credentials/{}",
450            role_name
451        );
452        let creds_resp = self
453            .client
454            .get(&creds_url)
455            .header("X-aws-ec2-metadata-token", &imds_token)
456            .send()
457            .await
458            .map_err(|e| {
459                StorageError::ConnectionError(format!("IAM: Failed to fetch credentials: {}", e))
460            })?;
461
462        let creds: IamCredentials = creds_resp.json().await.map_err(|e| {
463            StorageError::ConnectionError(format!("IAM: Failed to parse credentials: {}", e))
464        })?;
465
466        let now_secs = std::time::SystemTime::now()
467            .duration_since(std::time::UNIX_EPOCH)
468            .unwrap_or_default()
469            .as_secs();
470
471        Ok((
472            creds.access_key_id,
473            creds.secret_access_key,
474            creds.token,
475            now_secs + 3600,
476        ))
477    }
478}
479
480#[cfg(all(feature = "async", feature = "networking"))]
481#[derive(Deserialize)]
482struct IamCredentials {
483    #[serde(rename = "AccessKeyId", alias = "accessKeyId")]
484    access_key_id: String,
485    #[serde(rename = "SecretAccessKey", alias = "secretAccessKey")]
486    secret_access_key: String,
487    #[serde(rename = "Token", alias = "token", alias = "SessionToken", default)]
488    token: Option<String>,
489}
490
491#[cfg(all(feature = "async", feature = "networking"))]
492#[async_trait::async_trait]
493impl AuthProvider for IamAuthProvider {
494    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
495        let needs_refresh = {
496            let cache = self.cache.lock().unwrap();
497            match &*cache {
498                Some((_, _, _, expiry)) => {
499                    let now = std::time::SystemTime::now()
500                        .duration_since(std::time::UNIX_EPOCH)
501                        .unwrap_or_default()
502                        .as_secs();
503                    now + 60 >= *expiry
504                }
505                None => true,
506            }
507        };
508
509        if needs_refresh {
510            let creds = self.fetch_credentials().await?;
511            let mut cache = self.cache.lock().unwrap();
512            *cache = Some(creds);
513        }
514
515        let cache = self.cache.lock().unwrap();
516        let (ak, sk, _, _) = cache.as_ref().unwrap();
517        let credentials = format!("{}:{}", ak, sk);
518        let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
519        let header_str = format!("Basic {}", encoded);
520        HeaderValue::from_str(&header_str)
521            .map_err(|e| StorageError::ConnectionError(format!("Invalid IAM auth header: {}", e)))
522    }
523
524    fn mode_name(&self) -> &'static str {
525        "iam"
526    }
527}
528
529// ── OidcAuthProvider (POST /api/v1/oidc/login) ──────────────────────────────
530
531#[cfg(all(feature = "async", feature = "networking"))]
532pub(crate) struct OidcAuthProvider {
533    endpoint: String,
534    external_token: String,
535    client: Client,
536    /// Cached lakeFS JWT + expiry.
537    cache: Mutex<Option<(String, u64)>>,
538}
539
540#[cfg(all(feature = "async", feature = "networking"))]
541impl OidcAuthProvider {
542    pub fn new(endpoint: &str, external_token: &str) -> Self {
543        let client = Client::builder()
544            .timeout(std::time::Duration::from_secs(10))
545            .build()
546            .unwrap_or_default();
547        Self {
548            endpoint: endpoint.trim_end_matches('/').to_string(),
549            external_token: external_token.to_string(),
550            client,
551            cache: Mutex::new(None),
552        }
553    }
554
555    async fn exchange_token(&self) -> Result<(String, u64), StorageError> {
556        #[derive(Deserialize)]
557        struct OidcResponse {
558            token: String,
559            #[serde(default)]
560            token_expiration: Option<u64>,
561        }
562
563        let url = format!("{}/api/v1/oidc/login", self.endpoint);
564
565        let resp = self
566            .client
567            .post(&url)
568            .header("Content-Type", "application/x-www-form-urlencoded")
569            .body(format!("code={}", self.external_token))
570            .send()
571            .await
572            .map_err(|e| StorageError::ConnectionError(format!("OIDC login failed: {}", e)))?;
573
574        if !resp.status().is_success() {
575            let status = resp.status();
576            let text = resp.text().await.unwrap_or_default();
577            return Err(StorageError::ConnectionError(format!(
578                "OIDC login returned {}: {}",
579                status, text
580            )));
581        }
582
583        let oidc: OidcResponse = resp.json().await.map_err(|e| {
584            StorageError::ConnectionError(format!("Failed to parse OIDC response: {}", e))
585        })?;
586
587        let now_secs = std::time::SystemTime::now()
588            .duration_since(std::time::UNIX_EPOCH)
589            .unwrap_or_default()
590            .as_secs();
591        let expiry = oidc.token_expiration.unwrap_or(now_secs + 3600);
592
593        Ok((oidc.token, expiry))
594    }
595}
596
597#[cfg(all(feature = "async", feature = "networking"))]
598#[async_trait::async_trait]
599impl AuthProvider for OidcAuthProvider {
600    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
601        let needs_refresh = {
602            let cache = self.cache.lock().unwrap();
603            match &*cache {
604                Some((_token, expiry)) => {
605                    let now = std::time::SystemTime::now()
606                        .duration_since(std::time::UNIX_EPOCH)
607                        .unwrap_or_default()
608                        .as_secs();
609                    now + 60 >= *expiry
610                }
611                None => true,
612            }
613        };
614
615        if needs_refresh {
616            let (token, expiry) = self.exchange_token().await?;
617            let mut cache = self.cache.lock().unwrap();
618            *cache = Some((token, expiry));
619        }
620
621        let cache = self.cache.lock().unwrap();
622        let (token, _) = cache.as_ref().unwrap();
623        let header_str = format!("Bearer {}", token);
624        HeaderValue::from_str(&header_str).map_err(|e| {
625            StorageError::ConnectionError(format!("Invalid OIDC Bearer header: {}", e))
626        })
627    }
628
629    fn mode_name(&self) -> &'static str {
630        "oidc"
631    }
632}
633
634// ── SamlAuthProvider (POST /api/v1/auth/external/saml) ──────────────────────
635
636#[cfg(all(feature = "async", feature = "networking"))]
637pub(crate) struct SamlAuthProvider {
638    endpoint: String,
639    assertion: String,
640    client: Client,
641    /// Cached lakeFS JWT + expiry.
642    cache: Mutex<Option<(String, u64)>>,
643}
644
645#[cfg(all(feature = "async", feature = "networking"))]
646impl SamlAuthProvider {
647    pub fn new(endpoint: &str, assertion: &str) -> Self {
648        let client = Client::builder()
649            .timeout(std::time::Duration::from_secs(10))
650            .build()
651            .unwrap_or_default();
652        Self {
653            endpoint: endpoint.trim_end_matches('/').to_string(),
654            assertion: assertion.to_string(),
655            client,
656            cache: Mutex::new(None),
657        }
658    }
659
660    async fn exchange_assertion(&self) -> Result<(String, u64), StorageError> {
661        #[derive(Deserialize)]
662        struct SamlResponse {
663            token: String,
664            #[serde(default)]
665            token_expiration: Option<u64>,
666        }
667
668        let url = format!("{}/api/v1/auth/external/saml", self.endpoint);
669
670        let resp = self
671            .client
672            .post(&url)
673            .header("Content-Type", "application/x-www-form-urlencoded")
674            .body(format!("SAMLResponse={}", self.assertion))
675            .send()
676            .await
677            .map_err(|e| StorageError::ConnectionError(format!("SAML login failed: {}", e)))?;
678
679        if !resp.status().is_success() {
680            let status = resp.status();
681            let text = resp.text().await.unwrap_or_default();
682            return Err(StorageError::ConnectionError(format!(
683                "SAML login returned {}: {}",
684                status, text
685            )));
686        }
687
688        let saml: SamlResponse = resp.json().await.map_err(|e| {
689            StorageError::ConnectionError(format!("Failed to parse SAML response: {}", e))
690        })?;
691
692        let now_secs = std::time::SystemTime::now()
693            .duration_since(std::time::UNIX_EPOCH)
694            .unwrap_or_default()
695            .as_secs();
696        let expiry = saml.token_expiration.unwrap_or(now_secs + 3600);
697
698        Ok((saml.token, expiry))
699    }
700}
701
702#[cfg(all(feature = "async", feature = "networking"))]
703#[async_trait::async_trait]
704impl AuthProvider for SamlAuthProvider {
705    async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
706        let needs_refresh = {
707            let cache = self.cache.lock().unwrap();
708            match &*cache {
709                Some((_token, expiry)) => {
710                    let now = std::time::SystemTime::now()
711                        .duration_since(std::time::UNIX_EPOCH)
712                        .unwrap_or_default()
713                        .as_secs();
714                    now + 60 >= *expiry
715                }
716                None => true,
717            }
718        };
719
720        if needs_refresh {
721            let (token, expiry) = self.exchange_assertion().await?;
722            let mut cache = self.cache.lock().unwrap();
723            *cache = Some((token, expiry));
724        }
725
726        let cache = self.cache.lock().unwrap();
727        let (token, _) = cache.as_ref().unwrap();
728        let header_str = format!("Bearer {}", token);
729        HeaderValue::from_str(&header_str).map_err(|e| {
730            StorageError::ConnectionError(format!("Invalid SAML Bearer header: {}", e))
731        })
732    }
733
734    fn mode_name(&self) -> &'static str {
735        "saml"
736    }
737}
738
739// ── Build AuthProvider from LakeFSAuth enum ─────────────────────────────────
740
741#[cfg(all(feature = "async", feature = "networking"))]
742fn build_auth_provider(
743    auth: &LakeFSAuth,
744    endpoint: &str,
745) -> Result<Arc<dyn AuthProvider>, StorageError> {
746    match auth {
747        LakeFSAuth::Basic {
748            access_key,
749            secret_key,
750        } => Ok(Arc::new(BasicAuthProvider::new(access_key, secret_key)?)),
751        LakeFSAuth::Token {
752            access_key,
753            secret_key,
754        } => Ok(Arc::new(TokenAuthProvider::new(
755            endpoint, access_key, secret_key,
756        ))),
757        LakeFSAuth::Sts { oidc_token } => Ok(Arc::new(StsAuthProvider::new(endpoint, oidc_token))),
758        LakeFSAuth::Iam => Ok(Arc::new(IamAuthProvider::new())),
759        LakeFSAuth::Oidc { token } => Ok(Arc::new(OidcAuthProvider::new(endpoint, token))),
760        LakeFSAuth::Saml { assertion } => Ok(Arc::new(SamlAuthProvider::new(endpoint, assertion))),
761    }
762}
763
764// ─── New response / request types ────────────────────────────────────────────
765
766/// Result of a branch merge operation.
767#[derive(Debug, Clone, Serialize, Deserialize)]
768pub struct MergeResult {
769    /// The commit ID produced by the merge (empty string on conflict).
770    pub commit_id: String,
771    /// Summary text returned by LakeFS.
772    pub summary: String,
773}
774
775/// Information about a single LakeFS commit.
776#[derive(Debug, Clone, Serialize, Deserialize)]
777pub struct CommitInfo {
778    pub id: String,
779    pub message: String,
780    pub committer: String,
781    #[serde(default)]
782    pub metadata: HashMap<String, String>,
783    /// Unix-epoch seconds.  LakeFS returns `creation_date` as an i64.
784    #[serde(default)]
785    pub creation_date: i64,
786}
787
788/// A single entry in a diff listing.
789#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct DiffEntry {
791    /// Object path.
792    pub path: String,
793    /// "added", "removed", "changed", or "conflict".
794    #[serde(rename = "type")]
795    pub diff_type: String,
796    /// Size in bytes (only for added / changed).
797    #[serde(default)]
798    pub size_bytes: Option<u64>,
799}
800
801/// Object metadata returned by the stat endpoint.
802#[derive(Debug, Clone, Serialize, Deserialize)]
803pub struct ObjectStats {
804    pub path: String,
805    #[serde(default)]
806    pub physical_address: String,
807    pub size_bytes: u64,
808    #[serde(default)]
809    pub content_type: String,
810    /// Unix-epoch seconds.
811    #[serde(default)]
812    pub mtime: i64,
813    #[serde(default)]
814    pub checksum: String,
815}
816
817/// Describes a LakeFS Action (webhook) to register.
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct LakeFSAction {
820    pub name: String,
821    /// e.g. `["pre-commit", "post-merge"]`
822    pub on: Vec<String>,
823    pub hooks: Vec<ActionHook>,
824}
825
826/// A single hook inside a LakeFS Action.
827#[derive(Debug, Clone, Serialize, Deserialize)]
828pub struct ActionHook {
829    pub id: String,
830    #[serde(rename = "type")]
831    pub hook_type: String,
832    pub description: String,
833    #[serde(default)]
834    pub properties: HashMap<String, String>,
835}
836
837/// Repository information returned by LakeFS.
838#[derive(Debug, Clone, Serialize, Deserialize)]
839pub struct RepositoryInfo {
840    pub id: String,
841    pub storage_namespace: String,
842    pub default_branch: String,
843    #[serde(default)]
844    pub creation_date: i64,
845}
846
847/// Branch reference returned by LakeFS.
848#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct BranchInfo {
850    pub id: String,
851    pub commit_id: String,
852}
853
854// ─── Core backend ────────────────────────────────────────────────────────────
855
856#[cfg(all(feature = "async", feature = "networking"))]
857#[derive(Clone)]
858pub struct LakeFSBackend {
859    client: Client,
860    endpoint: String,
861    repository: String, // "engagement" in Briefcase terminology
862    branch: String,     // "workstream" in Briefcase terminology
863    #[allow(dead_code)]
864    access_key: String,
865    #[allow(dead_code)]
866    secret_key: String,
867    /// Per-request auth provider.  Replaces static default headers so that
868    /// token-based modes (JWT, STS, OIDC, SAML) can refresh transparently.
869    auth_provider: Arc<dyn AuthProvider>,
870    pending_writes: Arc<Mutex<Vec<Snapshot>>>,
871}
872
873#[cfg(all(feature = "async", feature = "networking"))]
874impl LakeFSBackend {
875    pub fn new(config: LakeFSConfig) -> Result<Self, StorageError> {
876        let endpoint = config.endpoint.trim_end_matches('/').to_string();
877
878        // Build auth provider from explicit config or default to Basic.
879        let auth = config.auth.clone().unwrap_or_else(|| LakeFSAuth::Basic {
880            access_key: config.access_key.clone(),
881            secret_key: config.secret_key.clone(),
882        });
883        let auth_provider = build_auth_provider(&auth, &endpoint)?;
884
885        // Default headers — Content-Type only.  Authorization is per-request.
886        let mut headers = HeaderMap::new();
887        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
888
889        let client = Client::builder()
890            .default_headers(headers)
891            .timeout(std::time::Duration::from_secs(30))
892            .build()
893            .map_err(|e| {
894                StorageError::ConnectionError(format!("Failed to create HTTP client: {}", e))
895            })?;
896
897        Ok(Self {
898            client,
899            endpoint,
900            repository: config.repository,
901            branch: config.branch,
902            access_key: config.access_key,
903            secret_key: config.secret_key,
904            auth_provider,
905            pending_writes: Arc::new(Mutex::new(Vec::new())),
906        })
907    }
908
909    /// Send an HTTP request with the current authorization header.
910    ///
911    /// All lakeFS API calls route through this method so that token refresh
912    /// happens transparently for JWT/STS/OIDC/SAML modes.
913    async fn send_authed(&self, builder: RequestBuilder) -> Result<Response, StorageError> {
914        let auth_header = self.auth_provider.auth_header().await?;
915        builder
916            .header(AUTHORIZATION, auth_header)
917            .send()
918            .await
919            .map_err(|e| StorageError::ConnectionError(format!("Request failed: {}", e)))
920    }
921
922    /// Returns the active authentication mode name (for logging/diagnostics).
923    pub fn auth_mode(&self) -> &'static str {
924        self.auth_provider.mode_name()
925    }
926
927    // ── Accessors ────────────────────────────────────────────────────────
928
929    /// Returns the configured repository name.
930    pub fn repository(&self) -> &str {
931        &self.repository
932    }
933
934    /// Returns the configured branch name.
935    pub fn branch(&self) -> &str {
936        &self.branch
937    }
938
939    /// Returns the configured endpoint URL.
940    pub fn endpoint(&self) -> &str {
941        &self.endpoint
942    }
943
944    // ── Commit operations ────────────────────────────────────────────────
945
946    /// Create a commit (checkpoint) with pending writes.
947    pub async fn create_commit(&self, message: &str) -> Result<String, StorageError> {
948        let url = format!(
949            "{}/api/v1/repositories/{}/branches/{}/commits",
950            self.endpoint, self.repository, self.branch
951        );
952
953        #[derive(Serialize)]
954        struct CommitRequest {
955            message: String,
956            metadata: HashMap<String, String>,
957        }
958
959        let mut metadata = HashMap::new();
960        metadata.insert("source".to_string(), "briefcase-ai".to_string());
961
962        let request = CommitRequest {
963            message: message.to_string(),
964            metadata,
965        };
966
967        let response = self
968            .send_authed(self.client.post(&url).json(&request))
969            .await?;
970
971        let status = response.status();
972        if !status.is_success() {
973            let error_text = response.text().await.unwrap_or_default();
974            return Err(StorageError::ConnectionError(format!(
975                "Commit failed with status {}: {}",
976                status, error_text
977            )));
978        }
979
980        #[derive(Deserialize)]
981        struct CommitResponse {
982            id: String,
983        }
984
985        let commit_response: CommitResponse = response.json().await.map_err(|e| {
986            StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
987        })?;
988
989        Ok(commit_response.id)
990    }
991
992    // ── Object operations ────────────────────────────────────────────────
993
994    /// Upload object to LakeFS.
995    pub async fn upload_object(&self, path: &str, data: &[u8]) -> Result<(), StorageError> {
996        // lakeFS Cloud uses a three-step presigned-staging flow:
997        //   1. GET  /staging/backing?path=...&presign=true
998        //          → { physical_address, presigned_url, presigned_url_expiry }
999        //   2. PUT  presigned_url (direct to S3, no lakeFS auth)
1000        //          → S3 returns ETag
1001        //   3. PUT  /objects?path=... with JSON { physical_address, checksum, size_bytes }
1002        //          → lakeFS links the staged object into the branch
1003
1004        // ── Step 1: Obtain presigned staging location ─────────────────────────
1005        let staging_url = format!(
1006            "{}/api/v1/repositories/{}/branches/{}/staging/backing",
1007            self.endpoint, self.repository, self.branch
1008        );
1009
1010        let stage_resp = self
1011            .send_authed(
1012                self.client
1013                    .get(&staging_url)
1014                    .query(&[("path", path), ("presign", "true")]),
1015            )
1016            .await?;
1017
1018        if !stage_resp.status().is_success() {
1019            let status = stage_resp.status();
1020            let body = stage_resp.text().await.unwrap_or_default();
1021            return Err(StorageError::ConnectionError(format!(
1022                "Staging location request failed with status {}: {}",
1023                status, body
1024            )));
1025        }
1026
1027        let staging: serde_json::Value = stage_resp.json().await.map_err(|e| {
1028            StorageError::SerializationError(format!("Failed to parse staging response: {}", e))
1029        })?;
1030
1031        let physical_address = staging["physical_address"]
1032            .as_str()
1033            .ok_or_else(|| {
1034                StorageError::ConnectionError(format!(
1035                    "Staging response missing physical_address. Response: {}",
1036                    staging
1037                ))
1038            })?
1039            .to_string();
1040
1041        // lakeFS Cloud returns "presigned_url" (flat field), not "presign_info.url".
1042        let presign_url = staging["presigned_url"]
1043            .as_str()
1044            .ok_or_else(|| {
1045                StorageError::ConnectionError(format!(
1046                    "Staging response missing presigned_url. Response: {}",
1047                    staging
1048                ))
1049            })?
1050            .to_string();
1051
1052        // ── Step 2: PUT directly to presigned S3 URL (no lakeFS auth header) ──
1053        let s3_resp = self
1054            .client
1055            .put(&presign_url)
1056            .body(data.to_vec())
1057            .send()
1058            .await
1059            .map_err(|e| {
1060                StorageError::ConnectionError(format!("S3 presigned upload failed: {}", e))
1061            })?;
1062
1063        if !s3_resp.status().is_success() {
1064            let status = s3_resp.status();
1065            let body = s3_resp.text().await.unwrap_or_default();
1066            return Err(StorageError::ConnectionError(format!(
1067                "S3 presigned upload failed with status {}: {}",
1068                status, body
1069            )));
1070        }
1071
1072        // S3 returns the ETag (MD5 hex, quoted) — strip quotes for the checksum field.
1073        let etag = s3_resp
1074            .headers()
1075            .get("etag")
1076            .and_then(|v| v.to_str().ok())
1077            .unwrap_or("")
1078            .trim_matches('"')
1079            .to_string();
1080
1081        // ── Step 3: Link staged object into lakeFS branch via PUT /objects ─────
1082        // On this lakeFS Cloud instance PUT /objects expects a JSON staging-link
1083        // body (ObjectStageCreation), not a raw upload body.
1084        let objects_url = format!(
1085            "{}/api/v1/repositories/{}/branches/{}/objects",
1086            self.endpoint, self.repository, self.branch
1087        );
1088
1089        let link_body = serde_json::json!({
1090            "physical_address": physical_address,
1091            "checksum": etag,
1092            "size_bytes": data.len() as i64,
1093        });
1094
1095        let link_resp = self
1096            .send_authed(
1097                self.client
1098                    .put(&objects_url)
1099                    .query(&[("path", path)])
1100                    .json(&link_body),
1101            )
1102            .await?;
1103
1104        if !link_resp.status().is_success() {
1105            let status = link_resp.status();
1106            let body = link_resp.text().await.unwrap_or_default();
1107            return Err(StorageError::ConnectionError(format!(
1108                "Staging link failed with status {}: {}",
1109                status, body
1110            )));
1111        }
1112
1113        Ok(())
1114    }
1115
1116    /// Download object from LakeFS.
1117    pub async fn download_object(&self, path: &str) -> Result<Vec<u8>, StorageError> {
1118        let url = format!(
1119            "{}/api/v1/repositories/{}/refs/{}/objects",
1120            self.endpoint, self.repository, self.branch
1121        );
1122
1123        let response = self
1124            .send_authed(self.client.get(&url).query(&[("path", path)]))
1125            .await?;
1126
1127        if response.status() == reqwest::StatusCode::NOT_FOUND {
1128            return Err(StorageError::NotFound(format!(
1129                "Object not found: {}",
1130                path
1131            )));
1132        }
1133
1134        let status = response.status();
1135        if !status.is_success() {
1136            let error_text = response.text().await.unwrap_or_default();
1137            return Err(StorageError::ConnectionError(format!(
1138                "Download failed with status {}: {}",
1139                status, error_text
1140            )));
1141        }
1142
1143        let data = response.bytes().await.map_err(|e| {
1144            StorageError::ConnectionError(format!("Failed to read response: {}", e))
1145        })?;
1146
1147        Ok(data.to_vec())
1148    }
1149
1150    /// List objects with prefix.
1151    pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
1152        let url = format!(
1153            "{}/api/v1/repositories/{}/refs/{}/objects/ls",
1154            self.endpoint, self.repository, self.branch
1155        );
1156
1157        let response = self
1158            .send_authed(self.client.get(&url).query(&[("prefix", prefix)]))
1159            .await?;
1160
1161        let status = response.status();
1162        if !status.is_success() {
1163            let error_text = response.text().await.unwrap_or_default();
1164            return Err(StorageError::ConnectionError(format!(
1165                "List failed with status {}: {}",
1166                status, error_text
1167            )));
1168        }
1169
1170        #[derive(Deserialize)]
1171        struct ListResponse {
1172            results: Vec<ObjectInfo>,
1173        }
1174
1175        #[derive(Deserialize)]
1176        struct ObjectInfo {
1177            path: String,
1178            // lakeFS Cloud uses "path_type"; older/self-hosted lakeFS uses "type".
1179            #[serde(rename = "type", alias = "path_type")]
1180            object_type: String,
1181        }
1182
1183        let list_response: ListResponse = response.json().await.map_err(|e| {
1184            StorageError::SerializationError(format!("Failed to parse list response: {}", e))
1185        })?;
1186
1187        let paths = list_response
1188            .results
1189            .into_iter()
1190            .filter(|obj| obj.object_type == "object")
1191            .map(|obj| obj.path)
1192            .collect();
1193
1194        Ok(paths)
1195    }
1196
1197    /// Delete an object from the current branch.
1198    pub async fn delete_object(&self, path: &str) -> Result<(), StorageError> {
1199        if path.is_empty() {
1200            return Err(StorageError::InvalidQuery(
1201                "Object path cannot be empty".to_string(),
1202            ));
1203        }
1204
1205        let url = format!(
1206            "{}/api/v1/repositories/{}/branches/{}/objects",
1207            self.endpoint, self.repository, self.branch
1208        );
1209
1210        let response = self
1211            .send_authed(self.client.delete(&url).query(&[("path", path)]))
1212            .await?;
1213
1214        if response.status() == reqwest::StatusCode::NOT_FOUND {
1215            return Err(StorageError::NotFound(format!(
1216                "Object not found: {}",
1217                path
1218            )));
1219        }
1220
1221        let status = response.status();
1222        if !status.is_success() {
1223            let error_text = response.text().await.unwrap_or_default();
1224            return Err(StorageError::ConnectionError(format!(
1225                "Delete failed with status {}: {}",
1226                status, error_text
1227            )));
1228        }
1229
1230        Ok(())
1231    }
1232
1233    /// Get object statistics/metadata.
1234    pub async fn get_object_stats(&self, path: &str) -> Result<ObjectStats, StorageError> {
1235        if path.is_empty() {
1236            return Err(StorageError::InvalidQuery(
1237                "Object path cannot be empty".to_string(),
1238            ));
1239        }
1240
1241        let url = format!(
1242            "{}/api/v1/repositories/{}/refs/{}/objects/stat",
1243            self.endpoint, self.repository, self.branch
1244        );
1245
1246        let response = self
1247            .send_authed(self.client.get(&url).query(&[("path", path)]))
1248            .await?;
1249
1250        if response.status() == reqwest::StatusCode::NOT_FOUND {
1251            return Err(StorageError::NotFound(format!(
1252                "Object not found: {}",
1253                path
1254            )));
1255        }
1256
1257        let status = response.status();
1258        if !status.is_success() {
1259            let error_text = response.text().await.unwrap_or_default();
1260            return Err(StorageError::ConnectionError(format!(
1261                "Stats failed with status {}: {}",
1262                status, error_text
1263            )));
1264        }
1265
1266        let stats: ObjectStats = response.json().await.map_err(|e| {
1267            StorageError::SerializationError(format!("Failed to parse stats response: {}", e))
1268        })?;
1269
1270        Ok(stats)
1271    }
1272
1273    // ── Repository operations ────────────────────────────────────────────
1274
1275    /// Create a new LakeFS repository (engagement).
1276    pub async fn create_repository(
1277        &self,
1278        name: &str,
1279        storage_namespace: &str,
1280        default_branch: Option<&str>,
1281    ) -> Result<RepositoryInfo, StorageError> {
1282        if name.is_empty() {
1283            return Err(StorageError::InvalidQuery(
1284                "Repository name cannot be empty".to_string(),
1285            ));
1286        }
1287        if storage_namespace.is_empty() {
1288            return Err(StorageError::InvalidQuery(
1289                "Storage namespace cannot be empty".to_string(),
1290            ));
1291        }
1292
1293        let url = format!("{}/api/v1/repositories", self.endpoint);
1294
1295        #[derive(Serialize)]
1296        struct CreateRepoRequest {
1297            name: String,
1298            storage_namespace: String,
1299            default_branch: String,
1300        }
1301
1302        let request = CreateRepoRequest {
1303            name: name.to_string(),
1304            storage_namespace: storage_namespace.to_string(),
1305            default_branch: default_branch.unwrap_or("main").to_string(),
1306        };
1307
1308        let response = self
1309            .send_authed(self.client.post(&url).json(&request))
1310            .await?;
1311
1312        if response.status() == reqwest::StatusCode::CONFLICT {
1313            return Err(StorageError::ConnectionError(format!(
1314                "Repository '{}' already exists",
1315                name
1316            )));
1317        }
1318
1319        let status = response.status();
1320        if !status.is_success() {
1321            let error_text = response.text().await.unwrap_or_default();
1322            return Err(StorageError::ConnectionError(format!(
1323                "Create repository failed with status {}: {}",
1324                status, error_text
1325            )));
1326        }
1327
1328        let info: RepositoryInfo = response.json().await.map_err(|e| {
1329            StorageError::SerializationError(format!("Failed to parse repository response: {}", e))
1330        })?;
1331
1332        Ok(info)
1333    }
1334
1335    /// Delete a LakeFS repository.
1336    pub async fn delete_repository(&self, name: &str) -> Result<(), StorageError> {
1337        if name.is_empty() {
1338            return Err(StorageError::InvalidQuery(
1339                "Repository name cannot be empty".to_string(),
1340            ));
1341        }
1342
1343        let url = format!("{}/api/v1/repositories/{}", self.endpoint, name);
1344
1345        let response = self.send_authed(self.client.delete(&url)).await?;
1346
1347        if response.status() == reqwest::StatusCode::NOT_FOUND {
1348            return Err(StorageError::NotFound(format!(
1349                "Repository not found: {}",
1350                name
1351            )));
1352        }
1353
1354        let status = response.status();
1355        if !status.is_success() {
1356            let error_text = response.text().await.unwrap_or_default();
1357            return Err(StorageError::ConnectionError(format!(
1358                "Delete repository failed with status {}: {}",
1359                status, error_text
1360            )));
1361        }
1362
1363        Ok(())
1364    }
1365
1366    // ── Branch operations ────────────────────────────────────────────────
1367
1368    /// Create a new branch (workstream) from a source branch.
1369    pub async fn create_branch(
1370        &self,
1371        name: &str,
1372        source_branch: &str,
1373    ) -> Result<String, StorageError> {
1374        if name.is_empty() {
1375            return Err(StorageError::InvalidQuery(
1376                "Branch name cannot be empty".to_string(),
1377            ));
1378        }
1379        if source_branch.is_empty() {
1380            return Err(StorageError::InvalidQuery(
1381                "Source branch cannot be empty".to_string(),
1382            ));
1383        }
1384
1385        let url = format!(
1386            "{}/api/v1/repositories/{}/branches",
1387            self.endpoint, self.repository
1388        );
1389
1390        #[derive(Serialize)]
1391        struct CreateBranchRequest {
1392            name: String,
1393            source: String,
1394        }
1395
1396        let request = CreateBranchRequest {
1397            name: name.to_string(),
1398            source: source_branch.to_string(),
1399        };
1400
1401        let response = self
1402            .send_authed(self.client.post(&url).json(&request))
1403            .await?;
1404
1405        if response.status() == reqwest::StatusCode::CONFLICT {
1406            return Err(StorageError::ConnectionError(format!(
1407                "Branch '{}' already exists",
1408                name
1409            )));
1410        }
1411
1412        let status = response.status();
1413        if !status.is_success() {
1414            let error_text = response.text().await.unwrap_or_default();
1415            return Err(StorageError::ConnectionError(format!(
1416                "Create branch failed with status {}: {}",
1417                status, error_text
1418            )));
1419        }
1420
1421        // LakeFS returns the commit ID the new branch points to as a plain string
1422        let commit_id = response.text().await.map_err(|e| {
1423            StorageError::SerializationError(format!("Failed to read branch response: {}", e))
1424        })?;
1425
1426        // Strip surrounding quotes if present (LakeFS returns JSON string)
1427        let commit_id = commit_id.trim().trim_matches('"').to_string();
1428        Ok(commit_id)
1429    }
1430
1431    /// Delete a branch.
1432    pub async fn delete_branch(&self, name: &str) -> Result<(), StorageError> {
1433        if name.is_empty() {
1434            return Err(StorageError::InvalidQuery(
1435                "Branch name cannot be empty".to_string(),
1436            ));
1437        }
1438
1439        let url = format!(
1440            "{}/api/v1/repositories/{}/branches/{}",
1441            self.endpoint, self.repository, name
1442        );
1443
1444        let response = self.send_authed(self.client.delete(&url)).await?;
1445
1446        if response.status() == reqwest::StatusCode::NOT_FOUND {
1447            return Err(StorageError::NotFound(format!(
1448                "Branch not found: {}",
1449                name
1450            )));
1451        }
1452
1453        let status = response.status();
1454        if !status.is_success() {
1455            let error_text = response.text().await.unwrap_or_default();
1456            return Err(StorageError::ConnectionError(format!(
1457                "Delete branch failed with status {}: {}",
1458                status, error_text
1459            )));
1460        }
1461
1462        Ok(())
1463    }
1464
1465    /// Merge source branch into destination branch.
1466    pub async fn merge_branch(
1467        &self,
1468        source_branch: &str,
1469        destination_branch: &str,
1470        message: Option<&str>,
1471    ) -> Result<MergeResult, StorageError> {
1472        if source_branch.is_empty() || destination_branch.is_empty() {
1473            return Err(StorageError::InvalidQuery(
1474                "Source and destination branches cannot be empty".to_string(),
1475            ));
1476        }
1477
1478        let url = format!(
1479            "{}/api/v1/repositories/{}/refs/{}/merge/{}",
1480            self.endpoint, self.repository, source_branch, destination_branch
1481        );
1482
1483        #[derive(Serialize)]
1484        struct MergeRequest {
1485            message: String,
1486            metadata: HashMap<String, String>,
1487        }
1488
1489        let mut metadata = HashMap::new();
1490        metadata.insert("source".to_string(), "briefcase-ai".to_string());
1491
1492        let request = MergeRequest {
1493            message: message
1494                .unwrap_or(&format!(
1495                    "Merge {} into {}",
1496                    source_branch, destination_branch
1497                ))
1498                .to_string(),
1499            metadata,
1500        };
1501
1502        let response = self
1503            .send_authed(self.client.post(&url).json(&request))
1504            .await?;
1505
1506        if response.status() == reqwest::StatusCode::CONFLICT {
1507            return Err(StorageError::ConnectionError(
1508                "Merge conflict detected".to_string(),
1509            ));
1510        }
1511
1512        let status = response.status();
1513        if !status.is_success() {
1514            let error_text = response.text().await.unwrap_or_default();
1515            return Err(StorageError::ConnectionError(format!(
1516                "Merge failed with status {}: {}",
1517                status, error_text
1518            )));
1519        }
1520
1521        // LakeFS merge returns a reference (commit ID)
1522        #[derive(Deserialize)]
1523        struct MergeResponse {
1524            #[serde(default)]
1525            reference: String,
1526            #[serde(default)]
1527            summary: HashMap<String, serde_json::Value>,
1528        }
1529
1530        let merge_resp: MergeResponse = response.json().await.map_err(|e| {
1531            StorageError::SerializationError(format!("Failed to parse merge response: {}", e))
1532        })?;
1533
1534        Ok(MergeResult {
1535            commit_id: merge_resp.reference,
1536            summary: serde_json::to_string(&merge_resp.summary).unwrap_or_default(),
1537        })
1538    }
1539
1540    // ── Commit operations ────────────────────────────────────────────────
1541
1542    /// Get details of a specific commit.
1543    pub async fn get_commit(&self, commit_id: &str) -> Result<CommitInfo, StorageError> {
1544        if commit_id.is_empty() {
1545            return Err(StorageError::InvalidQuery(
1546                "Commit ID cannot be empty".to_string(),
1547            ));
1548        }
1549
1550        let url = format!(
1551            "{}/api/v1/repositories/{}/commits/{}",
1552            self.endpoint, self.repository, commit_id
1553        );
1554
1555        let response = self.send_authed(self.client.get(&url)).await?;
1556
1557        if response.status() == reqwest::StatusCode::NOT_FOUND {
1558            return Err(StorageError::NotFound(format!(
1559                "Commit not found: {}",
1560                commit_id
1561            )));
1562        }
1563
1564        let status = response.status();
1565        if !status.is_success() {
1566            let error_text = response.text().await.unwrap_or_default();
1567            return Err(StorageError::ConnectionError(format!(
1568                "Get commit failed with status {}: {}",
1569                status, error_text
1570            )));
1571        }
1572
1573        let info: CommitInfo = response.json().await.map_err(|e| {
1574            StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
1575        })?;
1576
1577        Ok(info)
1578    }
1579
1580    // ── Diff operations ──────────────────────────────────────────────────
1581
1582    /// Diff between two LakeFS references (branches, tags, or commit IDs).
1583    pub async fn diff_refs(
1584        &self,
1585        left_ref: &str,
1586        right_ref: &str,
1587    ) -> Result<Vec<DiffEntry>, StorageError> {
1588        if left_ref.is_empty() || right_ref.is_empty() {
1589            return Err(StorageError::InvalidQuery(
1590                "Both refs must be non-empty".to_string(),
1591            ));
1592        }
1593
1594        let url = format!(
1595            "{}/api/v1/repositories/{}/refs/{}/diff/{}",
1596            self.endpoint, self.repository, left_ref, right_ref
1597        );
1598
1599        let response = self.send_authed(self.client.get(&url)).await?;
1600
1601        let status = response.status();
1602        if !status.is_success() {
1603            let error_text = response.text().await.unwrap_or_default();
1604            return Err(StorageError::ConnectionError(format!(
1605                "Diff failed with status {}: {}",
1606                status, error_text
1607            )));
1608        }
1609
1610        #[derive(Deserialize)]
1611        struct DiffResponse {
1612            results: Vec<DiffEntry>,
1613        }
1614
1615        let diff_resp: DiffResponse = response.json().await.map_err(|e| {
1616            StorageError::SerializationError(format!("Failed to parse diff response: {}", e))
1617        })?;
1618
1619        Ok(diff_resp.results)
1620    }
1621
1622    // ── Actions (webhook) operations ─────────────────────────────────────
1623
1624    /// Register a LakeFS Action by uploading its YAML definition to the
1625    /// `_lakefs_actions/` namespace on the current branch, then committing.
1626    pub async fn register_action(&self, action: &LakeFSAction) -> Result<String, StorageError> {
1627        if action.name.is_empty() {
1628            return Err(StorageError::InvalidQuery(
1629                "Action name cannot be empty".to_string(),
1630            ));
1631        }
1632        if action.on.is_empty() {
1633            return Err(StorageError::InvalidQuery(
1634                "Action must have at least one event trigger".to_string(),
1635            ));
1636        }
1637        if action.hooks.is_empty() {
1638            return Err(StorageError::InvalidQuery(
1639                "Action must have at least one hook".to_string(),
1640            ));
1641        }
1642
1643        // Build the YAML content for the action
1644        let yaml_content = self.build_action_yaml(action);
1645
1646        // Upload to _lakefs_actions/ namespace
1647        let path = format!("_lakefs_actions/{}.yaml", action.name);
1648        self.upload_object(&path, yaml_content.as_bytes()).await?;
1649
1650        // Commit the action definition
1651        let commit_msg = format!("Register LakeFS Action: {}", action.name);
1652        let commit_id = self.create_commit(&commit_msg).await?;
1653
1654        Ok(commit_id)
1655    }
1656
1657    /// Build YAML string for a LakeFS Action definition.
1658    fn build_action_yaml(&self, action: &LakeFSAction) -> String {
1659        let mut yaml = String::new();
1660        yaml.push_str(&format!("name: {}\n", action.name));
1661        yaml.push_str("on:\n");
1662        for event in &action.on {
1663            yaml.push_str(&format!("  {}:\n", event));
1664        }
1665        yaml.push_str("hooks:\n");
1666        for hook in &action.hooks {
1667            yaml.push_str(&format!("  - id: {}\n", hook.id));
1668            yaml.push_str(&format!("    type: {}\n", hook.hook_type));
1669            yaml.push_str(&format!("    description: {}\n", hook.description));
1670            if !hook.properties.is_empty() {
1671                yaml.push_str("    properties:\n");
1672                for (k, v) in &hook.properties {
1673                    yaml.push_str(&format!("      {}: {}\n", k, v));
1674                }
1675            }
1676        }
1677        yaml
1678    }
1679
1680    // ── Path helpers ─────────────────────────────────────────────────────
1681
1682    /// Generate object path for snapshot.
1683    fn snapshot_path(&self, snapshot_id: &str) -> String {
1684        format!("snapshots/{}.json", snapshot_id)
1685    }
1686
1687    /// Generate object path for decision.
1688    fn decision_path(&self, decision_id: &str) -> String {
1689        format!("decisions/{}.json", decision_id)
1690    }
1691}
1692
1693// ─── StorageBackend trait impl ───────────────────────────────────────────────
1694
1695#[cfg(all(feature = "async", feature = "networking"))]
1696#[async_trait::async_trait]
1697impl StorageBackend for LakeFSBackend {
1698    async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
1699        let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1700
1701        // Add to pending writes instead of immediate upload
1702        {
1703            let mut pending = self.pending_writes.lock().unwrap();
1704            pending.push(snapshot.clone());
1705        }
1706
1707        Ok(snapshot_id)
1708    }
1709
1710    async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
1711        let decision_id = decision.metadata.snapshot_id.to_string();
1712        let path = self.decision_path(&decision_id);
1713
1714        let json_data = serde_json::to_vec(decision)
1715            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1716
1717        self.upload_object(&path, &json_data).await?;
1718
1719        Ok(decision_id)
1720    }
1721
1722    async fn load(&self, snapshot_id: &str) -> Result<Snapshot, StorageError> {
1723        let path = self.snapshot_path(snapshot_id);
1724        let data = self.download_object(&path).await?;
1725
1726        let snapshot: Snapshot = serde_json::from_slice(&data)
1727            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1728
1729        Ok(snapshot)
1730    }
1731
1732    async fn load_decision(&self, decision_id: &str) -> Result<DecisionSnapshot, StorageError> {
1733        let path = self.decision_path(decision_id);
1734        let data = self.download_object(&path).await?;
1735
1736        let decision: DecisionSnapshot = serde_json::from_slice(&data)
1737            .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1738
1739        Ok(decision)
1740    }
1741
1742    async fn query(&self, query: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
1743        // List all snapshots
1744        let paths = self.list_objects("snapshots/").await?;
1745
1746        let mut snapshots = Vec::new();
1747        let mut count = 0;
1748        let offset = query.offset.unwrap_or(0);
1749        let limit = query.limit.unwrap_or(usize::MAX);
1750
1751        for path in paths {
1752            if let Some(filename) = path.split('/').next_back() {
1753                if let Some(snapshot_id) = filename.strip_suffix(".json") {
1754                    // Load snapshot to check filters
1755                    match self.load(snapshot_id).await {
1756                        Ok(snapshot) => {
1757                            // Apply filters
1758                            if self.matches_query(&snapshot, &query) {
1759                                if count >= offset {
1760                                    snapshots.push(snapshot);
1761                                    if snapshots.len() >= limit {
1762                                        break;
1763                                    }
1764                                }
1765                                count += 1;
1766                            }
1767                        }
1768                        Err(_) => continue, // Skip invalid snapshots
1769                    }
1770                }
1771            }
1772        }
1773
1774        // Sort by timestamp (newest first)
1775        snapshots.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
1776
1777        Ok(snapshots)
1778    }
1779
1780    async fn delete(&self, snapshot_id: &str) -> Result<bool, StorageError> {
1781        let path = self.snapshot_path(snapshot_id);
1782        // lakeFS DELETE is idempotent — it returns 204 whether or not the object
1783        // existed, so we can't infer existence from the delete response alone.
1784        // Stat first to get an authoritative answer, then delete only if found.
1785        match self.get_object_stats(&path).await {
1786            Ok(_) => {
1787                self.delete_object(&path).await?;
1788                Ok(true)
1789            }
1790            Err(StorageError::NotFound(_)) => Ok(false),
1791            Err(e) => Err(e),
1792        }
1793    }
1794
1795    async fn flush(&self) -> Result<FlushResult, StorageError> {
1796        let pending_snapshots = {
1797            let mut pending = self.pending_writes.lock().unwrap();
1798            let snapshots = pending.clone();
1799            pending.clear();
1800            snapshots
1801        };
1802
1803        if pending_snapshots.is_empty() {
1804            return Ok(FlushResult {
1805                snapshots_written: 0,
1806                bytes_written: 0,
1807                checkpoint_id: None,
1808            });
1809        }
1810
1811        let mut bytes_written = 0;
1812
1813        // Upload all pending snapshots
1814        for snapshot in &pending_snapshots {
1815            let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1816            let path = self.snapshot_path(&snapshot_id);
1817
1818            let json_data = serde_json::to_vec(snapshot)
1819                .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1820
1821            bytes_written += json_data.len();
1822
1823            self.upload_object(&path, &json_data).await?;
1824
1825            // Also upload individual decisions
1826            for decision in &snapshot.decisions {
1827                let decision_id = decision.metadata.snapshot_id.to_string();
1828                let decision_path = self.decision_path(&decision_id);
1829
1830                let decision_data = serde_json::to_vec(decision)
1831                    .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1832
1833                bytes_written += decision_data.len();
1834                self.upload_object(&decision_path, &decision_data).await?;
1835            }
1836        }
1837
1838        // Create commit
1839        let commit_message = format!("Briefcase AI flush: {} snapshots", pending_snapshots.len());
1840        let commit_id = self.create_commit(&commit_message).await?;
1841
1842        Ok(FlushResult {
1843            snapshots_written: pending_snapshots.len(),
1844            bytes_written,
1845            checkpoint_id: Some(commit_id),
1846        })
1847    }
1848
1849    async fn health_check(&self) -> Result<bool, StorageError> {
1850        let url = format!("{}/api/v1/repositories/{}", self.endpoint, self.repository);
1851
1852        let response = self
1853            .send_authed(self.client.get(&url))
1854            .await
1855            .map_err(|e| StorageError::ConnectionError(format!("Health check failed: {}", e)))?;
1856
1857        Ok(response.status().is_success())
1858    }
1859}
1860
1861#[cfg(all(feature = "async", feature = "networking"))]
1862impl LakeFSBackend {
1863    /// Check if snapshot matches query filters
1864    fn matches_query(&self, snapshot: &Snapshot, query: &SnapshotQuery) -> bool {
1865        // Check time range
1866        if let Some(start_time) = query.start_time {
1867            if snapshot.metadata.timestamp < start_time {
1868                return false;
1869            }
1870        }
1871
1872        if let Some(end_time) = query.end_time {
1873            if snapshot.metadata.timestamp > end_time {
1874                return false;
1875            }
1876        }
1877
1878        // Check function name, module name, model name, tags in decisions
1879        if query.function_name.is_some()
1880            || query.module_name.is_some()
1881            || query.model_name.is_some()
1882            || query.tags.is_some()
1883        {
1884            let mut found_match = false;
1885
1886            for decision in &snapshot.decisions {
1887                let mut decision_matches = true;
1888
1889                if let Some(function_name) = &query.function_name {
1890                    if decision.function_name != *function_name {
1891                        decision_matches = false;
1892                    }
1893                }
1894
1895                if let Some(module_name) = &query.module_name {
1896                    if decision.module_name.as_ref() != Some(module_name) {
1897                        decision_matches = false;
1898                    }
1899                }
1900
1901                if let Some(model_name) = &query.model_name {
1902                    if let Some(model_params) = &decision.model_parameters {
1903                        if model_params.model_name != *model_name {
1904                            decision_matches = false;
1905                        }
1906                    } else {
1907                        decision_matches = false;
1908                    }
1909                }
1910
1911                if let Some(query_tags) = &query.tags {
1912                    for (key, value) in query_tags {
1913                        if decision.tags.get(key) != Some(value) {
1914                            decision_matches = false;
1915                            break;
1916                        }
1917                    }
1918                }
1919
1920                if decision_matches {
1921                    found_match = true;
1922                    break;
1923                }
1924            }
1925
1926            if !found_match {
1927                return false;
1928            }
1929        }
1930
1931        true
1932    }
1933}
1934
1935// ─── Config ──────────────────────────────────────────────────────────────────
1936
1937#[derive(Debug, Clone)]
1938pub struct LakeFSConfig {
1939    pub endpoint: String,
1940    pub repository: String,
1941    pub branch: String,
1942    pub access_key: String,
1943    pub secret_key: String,
1944    /// Optional explicit auth mode.  When `None`, falls back to
1945    /// `LakeFSAuth::Basic { access_key, secret_key }`.
1946    pub auth: Option<LakeFSAuth>,
1947}
1948
1949impl LakeFSConfig {
1950    pub fn new(
1951        endpoint: impl Into<String>,
1952        repository: impl Into<String>,
1953        branch: impl Into<String>,
1954        access_key: impl Into<String>,
1955        secret_key: impl Into<String>,
1956    ) -> Self {
1957        Self {
1958            endpoint: endpoint.into(),
1959            repository: repository.into(),
1960            branch: branch.into(),
1961            access_key: access_key.into(),
1962            secret_key: secret_key.into(),
1963            auth: None,
1964        }
1965    }
1966
1967    /// Set the authentication mode explicitly.
1968    pub fn with_auth(mut self, auth: LakeFSAuth) -> Self {
1969        self.auth = Some(auth);
1970        self
1971    }
1972}
1973
1974// ─── Tests ───────────────────────────────────────────────────────────────────
1975
1976#[cfg(test)]
1977mod tests {
1978    use super::*;
1979    use crate::models::*;
1980    use serde_json::json;
1981    use wiremock::matchers::{method, path, query_param};
1982    use wiremock::{Mock, MockServer, ResponseTemplate};
1983
1984    fn create_test_config_with_endpoint(endpoint: &str) -> LakeFSConfig {
1985        LakeFSConfig::new(
1986            endpoint,
1987            "briefcase-test",
1988            "main",
1989            "test_key",
1990            "test_secret",
1991        )
1992    }
1993
1994    fn create_test_config() -> LakeFSConfig {
1995        LakeFSConfig::new(
1996            "http://localhost:8000",
1997            "briefcase-test",
1998            "main",
1999            "test_access_key",
2000            "test_secret_key",
2001        )
2002    }
2003
2004    async fn create_test_snapshot() -> Snapshot {
2005        let input = Input::new("test_input", json!("value"), "string");
2006        let output = Output::new("test_output", json!("result"), "string");
2007        let model_params = ModelParameters::new("gpt-4");
2008
2009        let decision = DecisionSnapshot::new("test_function")
2010            .with_module("test_module")
2011            .add_input(input)
2012            .add_output(output)
2013            .with_model_parameters(model_params)
2014            .add_tag("env", "test");
2015
2016        let mut snapshot = Snapshot::new(SnapshotType::Session);
2017        snapshot.add_decision(decision);
2018        snapshot
2019    }
2020
2021    // ── Existing unit tests (non-HTTP) ───────────────────────────────────
2022
2023    #[tokio::test]
2024    async fn test_lakefs_config_creation() {
2025        let config = create_test_config();
2026        assert_eq!(config.endpoint, "http://localhost:8000");
2027        assert_eq!(config.repository, "briefcase-test");
2028        assert_eq!(config.branch, "main");
2029    }
2030
2031    #[tokio::test]
2032    async fn test_object_paths() {
2033        let config = create_test_config();
2034        let backend = LakeFSBackend::new(config).unwrap();
2035
2036        let snapshot_id = "test-snapshot-123";
2037        let decision_id = "test-decision-456";
2038
2039        assert_eq!(
2040            backend.snapshot_path(snapshot_id),
2041            "snapshots/test-snapshot-123.json"
2042        );
2043        assert_eq!(
2044            backend.decision_path(decision_id),
2045            "decisions/test-decision-456.json"
2046        );
2047    }
2048
2049    #[tokio::test]
2050    async fn test_query_matching() {
2051        let config = create_test_config();
2052        let backend = LakeFSBackend::new(config).unwrap();
2053        let snapshot = create_test_snapshot().await;
2054
2055        // Test function name matching
2056        let query = SnapshotQuery::new().with_function_name("test_function");
2057        assert!(backend.matches_query(&snapshot, &query));
2058
2059        let query = SnapshotQuery::new().with_function_name("other_function");
2060        assert!(!backend.matches_query(&snapshot, &query));
2061
2062        // Test tag matching
2063        let query = SnapshotQuery::new().with_tag("env", "test");
2064        assert!(backend.matches_query(&snapshot, &query));
2065
2066        let query = SnapshotQuery::new().with_tag("env", "prod");
2067        assert!(!backend.matches_query(&snapshot, &query));
2068
2069        // Test model name matching
2070        let query = SnapshotQuery::new().with_model_name("gpt-4");
2071        assert!(backend.matches_query(&snapshot, &query));
2072
2073        let query = SnapshotQuery::new().with_model_name("claude-3");
2074        assert!(!backend.matches_query(&snapshot, &query));
2075    }
2076
2077    #[tokio::test]
2078    async fn test_pending_writes() {
2079        let config = create_test_config();
2080        let backend = LakeFSBackend::new(config).unwrap();
2081        let snapshot = create_test_snapshot().await;
2082
2083        // Save should add to pending writes
2084        let snapshot_id = backend.save(&snapshot).await.unwrap();
2085        assert_eq!(snapshot_id, snapshot.metadata.snapshot_id.to_string());
2086
2087        // Check pending writes
2088        {
2089            let pending = backend.pending_writes.lock().unwrap();
2090            assert_eq!(pending.len(), 1);
2091        }
2092    }
2093
2094    #[tokio::test]
2095    async fn test_accessors() {
2096        let config = create_test_config();
2097        let backend = LakeFSBackend::new(config).unwrap();
2098
2099        assert_eq!(backend.repository(), "briefcase-test");
2100        assert_eq!(backend.branch(), "main");
2101        assert_eq!(backend.endpoint(), "http://localhost:8000");
2102    }
2103
2104    // ── Edge cases (input validation) ────────────────────────────────────
2105
2106    #[tokio::test]
2107    async fn test_create_repository_empty_name() {
2108        let config = create_test_config();
2109        let backend = LakeFSBackend::new(config).unwrap();
2110
2111        let result = backend.create_repository("", "s3://bucket", None).await;
2112        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2113    }
2114
2115    #[tokio::test]
2116    async fn test_create_repository_empty_namespace() {
2117        let config = create_test_config();
2118        let backend = LakeFSBackend::new(config).unwrap();
2119
2120        let result = backend.create_repository("my-repo", "", None).await;
2121        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2122    }
2123
2124    #[tokio::test]
2125    async fn test_delete_repository_empty_name() {
2126        let config = create_test_config();
2127        let backend = LakeFSBackend::new(config).unwrap();
2128
2129        let result = backend.delete_repository("").await;
2130        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2131    }
2132
2133    #[tokio::test]
2134    async fn test_create_branch_empty_name() {
2135        let config = create_test_config();
2136        let backend = LakeFSBackend::new(config).unwrap();
2137
2138        let result = backend.create_branch("", "main").await;
2139        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2140    }
2141
2142    #[tokio::test]
2143    async fn test_create_branch_empty_source() {
2144        let config = create_test_config();
2145        let backend = LakeFSBackend::new(config).unwrap();
2146
2147        let result = backend.create_branch("feature", "").await;
2148        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2149    }
2150
2151    #[tokio::test]
2152    async fn test_delete_branch_empty_name() {
2153        let config = create_test_config();
2154        let backend = LakeFSBackend::new(config).unwrap();
2155
2156        let result = backend.delete_branch("").await;
2157        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2158    }
2159
2160    #[tokio::test]
2161    async fn test_merge_branch_empty_source() {
2162        let config = create_test_config();
2163        let backend = LakeFSBackend::new(config).unwrap();
2164
2165        let result = backend.merge_branch("", "main", None).await;
2166        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2167    }
2168
2169    #[tokio::test]
2170    async fn test_merge_branch_empty_destination() {
2171        let config = create_test_config();
2172        let backend = LakeFSBackend::new(config).unwrap();
2173
2174        let result = backend.merge_branch("feature", "", None).await;
2175        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2176    }
2177
2178    #[tokio::test]
2179    async fn test_get_commit_empty_id() {
2180        let config = create_test_config();
2181        let backend = LakeFSBackend::new(config).unwrap();
2182
2183        let result = backend.get_commit("").await;
2184        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2185    }
2186
2187    #[tokio::test]
2188    async fn test_diff_refs_empty_left() {
2189        let config = create_test_config();
2190        let backend = LakeFSBackend::new(config).unwrap();
2191
2192        let result = backend.diff_refs("", "main").await;
2193        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2194    }
2195
2196    #[tokio::test]
2197    async fn test_diff_refs_empty_right() {
2198        let config = create_test_config();
2199        let backend = LakeFSBackend::new(config).unwrap();
2200
2201        let result = backend.diff_refs("feature", "").await;
2202        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2203    }
2204
2205    #[tokio::test]
2206    async fn test_delete_object_empty_path() {
2207        let config = create_test_config();
2208        let backend = LakeFSBackend::new(config).unwrap();
2209
2210        let result = backend.delete_object("").await;
2211        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2212    }
2213
2214    #[tokio::test]
2215    async fn test_get_object_stats_empty_path() {
2216        let config = create_test_config();
2217        let backend = LakeFSBackend::new(config).unwrap();
2218
2219        let result = backend.get_object_stats("").await;
2220        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2221    }
2222
2223    #[tokio::test]
2224    async fn test_register_action_empty_name() {
2225        let config = create_test_config();
2226        let backend = LakeFSBackend::new(config).unwrap();
2227
2228        let action = LakeFSAction {
2229            name: "".to_string(),
2230            on: vec!["post-commit".to_string()],
2231            hooks: vec![ActionHook {
2232                id: "hook1".to_string(),
2233                hook_type: "webhook".to_string(),
2234                description: "test".to_string(),
2235                properties: HashMap::new(),
2236            }],
2237        };
2238
2239        let result = backend.register_action(&action).await;
2240        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2241    }
2242
2243    #[tokio::test]
2244    async fn test_register_action_no_events() {
2245        let config = create_test_config();
2246        let backend = LakeFSBackend::new(config).unwrap();
2247
2248        let action = LakeFSAction {
2249            name: "test-action".to_string(),
2250            on: vec![],
2251            hooks: vec![ActionHook {
2252                id: "hook1".to_string(),
2253                hook_type: "webhook".to_string(),
2254                description: "test".to_string(),
2255                properties: HashMap::new(),
2256            }],
2257        };
2258
2259        let result = backend.register_action(&action).await;
2260        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2261    }
2262
2263    #[tokio::test]
2264    async fn test_register_action_no_hooks() {
2265        let config = create_test_config();
2266        let backend = LakeFSBackend::new(config).unwrap();
2267
2268        let action = LakeFSAction {
2269            name: "test-action".to_string(),
2270            on: vec!["post-commit".to_string()],
2271            hooks: vec![],
2272        };
2273
2274        let result = backend.register_action(&action).await;
2275        assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2276    }
2277
2278    // ── YAML builder tests ───────────────────────────────────────────────
2279
2280    #[tokio::test]
2281    async fn test_build_action_yaml() {
2282        let config = create_test_config();
2283        let backend = LakeFSBackend::new(config).unwrap();
2284
2285        let mut props = HashMap::new();
2286        props.insert("url".to_string(), "http://localhost:9000/hook".to_string());
2287
2288        let action = LakeFSAction {
2289            name: "briefcase-post-commit".to_string(),
2290            on: vec!["post-commit".to_string()],
2291            hooks: vec![ActionHook {
2292                id: "notify".to_string(),
2293                hook_type: "webhook".to_string(),
2294                description: "Notify Briefcase server".to_string(),
2295                properties: props,
2296            }],
2297        };
2298
2299        let yaml = backend.build_action_yaml(&action);
2300        assert!(yaml.contains("name: briefcase-post-commit"));
2301        assert!(yaml.contains("post-commit:"));
2302        assert!(yaml.contains("id: notify"));
2303        assert!(yaml.contains("type: webhook"));
2304        assert!(yaml.contains("url: http://localhost:9000/hook"));
2305    }
2306
2307    #[tokio::test]
2308    async fn test_build_action_yaml_multiple_events() {
2309        let config = create_test_config();
2310        let backend = LakeFSBackend::new(config).unwrap();
2311
2312        let action = LakeFSAction {
2313            name: "multi-event".to_string(),
2314            on: vec!["pre-commit".to_string(), "post-merge".to_string()],
2315            hooks: vec![ActionHook {
2316                id: "h1".to_string(),
2317                hook_type: "webhook".to_string(),
2318                description: "Hook".to_string(),
2319                properties: HashMap::new(),
2320            }],
2321        };
2322
2323        let yaml = backend.build_action_yaml(&action);
2324        assert!(yaml.contains("pre-commit:"));
2325        assert!(yaml.contains("post-merge:"));
2326    }
2327
2328    // ── Flush edge case: empty pending writes ────────────────────────────
2329
2330    #[tokio::test]
2331    async fn test_flush_empty_pending() {
2332        let config = create_test_config();
2333        let backend = LakeFSBackend::new(config).unwrap();
2334
2335        let result = backend.flush().await.unwrap();
2336        assert_eq!(result.snapshots_written, 0);
2337        assert_eq!(result.bytes_written, 0);
2338        assert!(result.checkpoint_id.is_none());
2339    }
2340
2341    // ── HTTP mock tests (wiremock) ───────────────────────────────────────
2342
2343    #[tokio::test]
2344    async fn test_create_repository_success() {
2345        let mock_server = MockServer::start().await;
2346        let config = create_test_config_with_endpoint(&mock_server.uri());
2347        let backend = LakeFSBackend::new(config).unwrap();
2348
2349        Mock::given(method("POST"))
2350            .and(path("/api/v1/repositories"))
2351            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2352                "id": "my-engagement",
2353                "storage_namespace": "s3://my-bucket/engagement",
2354                "default_branch": "main",
2355                "creation_date": 1700000000
2356            })))
2357            .expect(1)
2358            .mount(&mock_server)
2359            .await;
2360
2361        let result = backend
2362            .create_repository("my-engagement", "s3://my-bucket/engagement", None)
2363            .await
2364            .unwrap();
2365
2366        assert_eq!(result.id, "my-engagement");
2367        assert_eq!(result.storage_namespace, "s3://my-bucket/engagement");
2368        assert_eq!(result.default_branch, "main");
2369    }
2370
2371    #[tokio::test]
2372    async fn test_create_repository_conflict() {
2373        let mock_server = MockServer::start().await;
2374        let config = create_test_config_with_endpoint(&mock_server.uri());
2375        let backend = LakeFSBackend::new(config).unwrap();
2376
2377        Mock::given(method("POST"))
2378            .and(path("/api/v1/repositories"))
2379            .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2380                "message": "repository already exists"
2381            })))
2382            .expect(1)
2383            .mount(&mock_server)
2384            .await;
2385
2386        let result = backend
2387            .create_repository("existing-repo", "s3://bucket", None)
2388            .await;
2389
2390        assert!(result.is_err());
2391        let err = result.unwrap_err();
2392        assert!(
2393            matches!(err, StorageError::ConnectionError(msg) if msg.contains("already exists"))
2394        );
2395    }
2396
2397    #[tokio::test]
2398    async fn test_create_repository_custom_branch() {
2399        let mock_server = MockServer::start().await;
2400        let config = create_test_config_with_endpoint(&mock_server.uri());
2401        let backend = LakeFSBackend::new(config).unwrap();
2402
2403        Mock::given(method("POST"))
2404            .and(path("/api/v1/repositories"))
2405            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2406                "id": "my-repo",
2407                "storage_namespace": "s3://bucket",
2408                "default_branch": "develop",
2409                "creation_date": 1700000000
2410            })))
2411            .expect(1)
2412            .mount(&mock_server)
2413            .await;
2414
2415        let result = backend
2416            .create_repository("my-repo", "s3://bucket", Some("develop"))
2417            .await
2418            .unwrap();
2419
2420        assert_eq!(result.default_branch, "develop");
2421    }
2422
2423    #[tokio::test]
2424    async fn test_delete_repository_success() {
2425        let mock_server = MockServer::start().await;
2426        let config = create_test_config_with_endpoint(&mock_server.uri());
2427        let backend = LakeFSBackend::new(config).unwrap();
2428
2429        Mock::given(method("DELETE"))
2430            .and(path("/api/v1/repositories/test-repo"))
2431            .respond_with(ResponseTemplate::new(204))
2432            .expect(1)
2433            .mount(&mock_server)
2434            .await;
2435
2436        backend.delete_repository("test-repo").await.unwrap();
2437    }
2438
2439    #[tokio::test]
2440    async fn test_delete_repository_not_found() {
2441        let mock_server = MockServer::start().await;
2442        let config = create_test_config_with_endpoint(&mock_server.uri());
2443        let backend = LakeFSBackend::new(config).unwrap();
2444
2445        Mock::given(method("DELETE"))
2446            .and(path("/api/v1/repositories/nonexistent"))
2447            .respond_with(ResponseTemplate::new(404))
2448            .expect(1)
2449            .mount(&mock_server)
2450            .await;
2451
2452        let result = backend.delete_repository("nonexistent").await;
2453        assert!(matches!(result, Err(StorageError::NotFound(_))));
2454    }
2455
2456    #[tokio::test]
2457    async fn test_create_branch_success() {
2458        let mock_server = MockServer::start().await;
2459        let config = create_test_config_with_endpoint(&mock_server.uri());
2460        let backend = LakeFSBackend::new(config).unwrap();
2461
2462        Mock::given(method("POST"))
2463            .and(path("/api/v1/repositories/briefcase-test/branches"))
2464            .respond_with(ResponseTemplate::new(201).set_body_string("\"abc123def456\""))
2465            .expect(1)
2466            .mount(&mock_server)
2467            .await;
2468
2469        let commit_id = backend.create_branch("feature-x", "main").await.unwrap();
2470        assert_eq!(commit_id, "abc123def456");
2471    }
2472
2473    #[tokio::test]
2474    async fn test_create_branch_conflict() {
2475        let mock_server = MockServer::start().await;
2476        let config = create_test_config_with_endpoint(&mock_server.uri());
2477        let backend = LakeFSBackend::new(config).unwrap();
2478
2479        Mock::given(method("POST"))
2480            .and(path("/api/v1/repositories/briefcase-test/branches"))
2481            .respond_with(ResponseTemplate::new(409))
2482            .expect(1)
2483            .mount(&mock_server)
2484            .await;
2485
2486        let result = backend.create_branch("existing-branch", "main").await;
2487        assert!(result.is_err());
2488    }
2489
2490    #[tokio::test]
2491    async fn test_delete_branch_success() {
2492        let mock_server = MockServer::start().await;
2493        let config = create_test_config_with_endpoint(&mock_server.uri());
2494        let backend = LakeFSBackend::new(config).unwrap();
2495
2496        Mock::given(method("DELETE"))
2497            .and(path(
2498                "/api/v1/repositories/briefcase-test/branches/feature-x",
2499            ))
2500            .respond_with(ResponseTemplate::new(204))
2501            .expect(1)
2502            .mount(&mock_server)
2503            .await;
2504
2505        backend.delete_branch("feature-x").await.unwrap();
2506    }
2507
2508    #[tokio::test]
2509    async fn test_delete_branch_not_found() {
2510        let mock_server = MockServer::start().await;
2511        let config = create_test_config_with_endpoint(&mock_server.uri());
2512        let backend = LakeFSBackend::new(config).unwrap();
2513
2514        Mock::given(method("DELETE"))
2515            .and(path(
2516                "/api/v1/repositories/briefcase-test/branches/nonexistent",
2517            ))
2518            .respond_with(ResponseTemplate::new(404))
2519            .expect(1)
2520            .mount(&mock_server)
2521            .await;
2522
2523        let result = backend.delete_branch("nonexistent").await;
2524        assert!(matches!(result, Err(StorageError::NotFound(_))));
2525    }
2526
2527    #[tokio::test]
2528    async fn test_merge_branch_success() {
2529        let mock_server = MockServer::start().await;
2530        let config = create_test_config_with_endpoint(&mock_server.uri());
2531        let backend = LakeFSBackend::new(config).unwrap();
2532
2533        Mock::given(method("POST"))
2534            .and(path(
2535                "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2536            ))
2537            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2538                "reference": "merge-commit-abc123",
2539                "summary": {"added": 3, "removed": 0, "changed": 1}
2540            })))
2541            .expect(1)
2542            .mount(&mock_server)
2543            .await;
2544
2545        let result = backend
2546            .merge_branch("feature-x", "main", Some("Merge feature-x"))
2547            .await
2548            .unwrap();
2549
2550        assert_eq!(result.commit_id, "merge-commit-abc123");
2551        assert!(result.summary.contains("added"));
2552    }
2553
2554    #[tokio::test]
2555    async fn test_merge_branch_conflict() {
2556        let mock_server = MockServer::start().await;
2557        let config = create_test_config_with_endpoint(&mock_server.uri());
2558        let backend = LakeFSBackend::new(config).unwrap();
2559
2560        Mock::given(method("POST"))
2561            .and(path(
2562                "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2563            ))
2564            .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2565                "message": "conflict"
2566            })))
2567            .expect(1)
2568            .mount(&mock_server)
2569            .await;
2570
2571        let result = backend.merge_branch("feature-x", "main", None).await;
2572        assert!(result.is_err());
2573        let err = result.unwrap_err();
2574        assert!(matches!(err, StorageError::ConnectionError(msg) if msg.contains("conflict")));
2575    }
2576
2577    #[tokio::test]
2578    async fn test_merge_branch_default_message() {
2579        let mock_server = MockServer::start().await;
2580        let config = create_test_config_with_endpoint(&mock_server.uri());
2581        let backend = LakeFSBackend::new(config).unwrap();
2582
2583        Mock::given(method("POST"))
2584            .and(path(
2585                "/api/v1/repositories/briefcase-test/refs/src/merge/dst",
2586            ))
2587            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2588                "reference": "commit-xyz",
2589                "summary": {}
2590            })))
2591            .expect(1)
2592            .mount(&mock_server)
2593            .await;
2594
2595        let result = backend.merge_branch("src", "dst", None).await.unwrap();
2596        assert_eq!(result.commit_id, "commit-xyz");
2597    }
2598
2599    #[tokio::test]
2600    async fn test_get_commit_success() {
2601        let mock_server = MockServer::start().await;
2602        let config = create_test_config_with_endpoint(&mock_server.uri());
2603        let backend = LakeFSBackend::new(config).unwrap();
2604
2605        Mock::given(method("GET"))
2606            .and(path("/api/v1/repositories/briefcase-test/commits/abc123"))
2607            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2608                "id": "abc123",
2609                "message": "Initial commit",
2610                "committer": "admin",
2611                "metadata": {"source": "briefcase-ai"},
2612                "creation_date": 1700000000
2613            })))
2614            .expect(1)
2615            .mount(&mock_server)
2616            .await;
2617
2618        let commit = backend.get_commit("abc123").await.unwrap();
2619        assert_eq!(commit.id, "abc123");
2620        assert_eq!(commit.message, "Initial commit");
2621        assert_eq!(commit.committer, "admin");
2622        assert_eq!(
2623            commit.metadata.get("source"),
2624            Some(&"briefcase-ai".to_string())
2625        );
2626    }
2627
2628    #[tokio::test]
2629    async fn test_get_commit_not_found() {
2630        let mock_server = MockServer::start().await;
2631        let config = create_test_config_with_endpoint(&mock_server.uri());
2632        let backend = LakeFSBackend::new(config).unwrap();
2633
2634        Mock::given(method("GET"))
2635            .and(path(
2636                "/api/v1/repositories/briefcase-test/commits/nonexistent",
2637            ))
2638            .respond_with(ResponseTemplate::new(404))
2639            .expect(1)
2640            .mount(&mock_server)
2641            .await;
2642
2643        let result = backend.get_commit("nonexistent").await;
2644        assert!(matches!(result, Err(StorageError::NotFound(_))));
2645    }
2646
2647    #[tokio::test]
2648    async fn test_diff_refs_success() {
2649        let mock_server = MockServer::start().await;
2650        let config = create_test_config_with_endpoint(&mock_server.uri());
2651        let backend = LakeFSBackend::new(config).unwrap();
2652
2653        Mock::given(method("GET"))
2654            .and(path(
2655                "/api/v1/repositories/briefcase-test/refs/main/diff/feature-x",
2656            ))
2657            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2658                "results": [
2659                    {"path": "snapshots/a.json", "type": "added", "size_bytes": 1024},
2660                    {"path": "snapshots/b.json", "type": "changed", "size_bytes": 2048},
2661                    {"path": "decisions/old.json", "type": "removed"}
2662                ]
2663            })))
2664            .expect(1)
2665            .mount(&mock_server)
2666            .await;
2667
2668        let diffs = backend.diff_refs("main", "feature-x").await.unwrap();
2669        assert_eq!(diffs.len(), 3);
2670        assert_eq!(diffs[0].path, "snapshots/a.json");
2671        assert_eq!(diffs[0].diff_type, "added");
2672        assert_eq!(diffs[0].size_bytes, Some(1024));
2673        assert_eq!(diffs[1].diff_type, "changed");
2674        assert_eq!(diffs[2].diff_type, "removed");
2675        assert_eq!(diffs[2].size_bytes, None);
2676    }
2677
2678    #[tokio::test]
2679    async fn test_diff_refs_empty_result() {
2680        let mock_server = MockServer::start().await;
2681        let config = create_test_config_with_endpoint(&mock_server.uri());
2682        let backend = LakeFSBackend::new(config).unwrap();
2683
2684        Mock::given(method("GET"))
2685            .and(path(
2686                "/api/v1/repositories/briefcase-test/refs/main/diff/main",
2687            ))
2688            .respond_with(ResponseTemplate::new(200).set_body_json(json!({"results": []})))
2689            .expect(1)
2690            .mount(&mock_server)
2691            .await;
2692
2693        let diffs = backend.diff_refs("main", "main").await.unwrap();
2694        assert!(diffs.is_empty());
2695    }
2696
2697    #[tokio::test]
2698    async fn test_get_object_stats_success() {
2699        let mock_server = MockServer::start().await;
2700        let config = create_test_config_with_endpoint(&mock_server.uri());
2701        let backend = LakeFSBackend::new(config).unwrap();
2702
2703        Mock::given(method("GET"))
2704            .and(path(
2705                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2706            ))
2707            .and(query_param("path", "snapshots/test.json"))
2708            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2709                "path": "snapshots/test.json",
2710                "physical_address": "s3://bucket/data/abc",
2711                "size_bytes": 4096,
2712                "content_type": "application/json",
2713                "mtime": 1700000000,
2714                "checksum": "sha256:abcdef"
2715            })))
2716            .expect(1)
2717            .mount(&mock_server)
2718            .await;
2719
2720        let stats = backend
2721            .get_object_stats("snapshots/test.json")
2722            .await
2723            .unwrap();
2724
2725        assert_eq!(stats.path, "snapshots/test.json");
2726        assert_eq!(stats.size_bytes, 4096);
2727        assert_eq!(stats.content_type, "application/json");
2728        assert_eq!(stats.checksum, "sha256:abcdef");
2729    }
2730
2731    #[tokio::test]
2732    async fn test_get_object_stats_not_found() {
2733        let mock_server = MockServer::start().await;
2734        let config = create_test_config_with_endpoint(&mock_server.uri());
2735        let backend = LakeFSBackend::new(config).unwrap();
2736
2737        Mock::given(method("GET"))
2738            .and(path(
2739                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2740            ))
2741            .respond_with(ResponseTemplate::new(404))
2742            .expect(1)
2743            .mount(&mock_server)
2744            .await;
2745
2746        let result = backend.get_object_stats("nonexistent.json").await;
2747        assert!(matches!(result, Err(StorageError::NotFound(_))));
2748    }
2749
2750    #[tokio::test]
2751    async fn test_delete_object_success() {
2752        let mock_server = MockServer::start().await;
2753        let config = create_test_config_with_endpoint(&mock_server.uri());
2754        let backend = LakeFSBackend::new(config).unwrap();
2755
2756        Mock::given(method("DELETE"))
2757            .and(path(
2758                "/api/v1/repositories/briefcase-test/branches/main/objects",
2759            ))
2760            .and(query_param("path", "snapshots/old.json"))
2761            .respond_with(ResponseTemplate::new(204))
2762            .expect(1)
2763            .mount(&mock_server)
2764            .await;
2765
2766        backend.delete_object("snapshots/old.json").await.unwrap();
2767    }
2768
2769    #[tokio::test]
2770    async fn test_delete_object_not_found() {
2771        let mock_server = MockServer::start().await;
2772        let config = create_test_config_with_endpoint(&mock_server.uri());
2773        let backend = LakeFSBackend::new(config).unwrap();
2774
2775        Mock::given(method("DELETE"))
2776            .and(path(
2777                "/api/v1/repositories/briefcase-test/branches/main/objects",
2778            ))
2779            .respond_with(ResponseTemplate::new(404))
2780            .expect(1)
2781            .mount(&mock_server)
2782            .await;
2783
2784        let result = backend.delete_object("nonexistent.json").await;
2785        assert!(matches!(result, Err(StorageError::NotFound(_))));
2786    }
2787
2788    #[tokio::test]
2789    async fn test_upload_object_success() {
2790        // The staging flow makes three requests:
2791        //   1. GET  /staging/backing?path=...&presign=true  → staging location
2792        //   2. PUT  <presigned S3 URL> (routed back to mock) → S3 upload
2793        //   3. PUT  /staging/backing?path=...               → link
2794        let mock_server = MockServer::start().await;
2795        let config = create_test_config_with_endpoint(&mock_server.uri());
2796        let backend = LakeFSBackend::new(config).unwrap();
2797
2798        // Step 1: staging GET — return presigned URL pointing at mock server.
2799        // lakeFS Cloud uses flat "presigned_url" field (not "presign_info.url").
2800        let presigned_s3_url = format!("{}/fake-s3/test-file", mock_server.uri());
2801        Mock::given(method("GET"))
2802            .and(path(
2803                "/api/v1/repositories/briefcase-test/branches/main/staging/backing",
2804            ))
2805            .and(query_param("path", "test/file.json"))
2806            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
2807                "physical_address": "s3://briefcase-test-bucket/test-file",
2808                "presigned_url": presigned_s3_url,
2809                "presigned_url_expiry": 9999999999i64
2810            })))
2811            .expect(1)
2812            .mount(&mock_server)
2813            .await;
2814
2815        // Step 2: presigned S3 PUT — return 200 with ETag.
2816        Mock::given(method("PUT"))
2817            .and(path("/fake-s3/test-file"))
2818            .respond_with(ResponseTemplate::new(200).append_header("etag", "\"abc123def456\""))
2819            .expect(1)
2820            .mount(&mock_server)
2821            .await;
2822
2823        // Step 3: staging link PUT → /objects with JSON body.
2824        Mock::given(method("PUT"))
2825            .and(path(
2826                "/api/v1/repositories/briefcase-test/branches/main/objects",
2827            ))
2828            .and(query_param("path", "test/file.json"))
2829            .respond_with(ResponseTemplate::new(200))
2830            .expect(1)
2831            .mount(&mock_server)
2832            .await;
2833
2834        backend
2835            .upload_object("test/file.json", b"hello world")
2836            .await
2837            .unwrap();
2838    }
2839
2840    #[tokio::test]
2841    async fn test_upload_object_server_error() {
2842        let mock_server = MockServer::start().await;
2843        let config = create_test_config_with_endpoint(&mock_server.uri());
2844        let backend = LakeFSBackend::new(config).unwrap();
2845
2846        // Fail at step 1 (staging GET returns 500).
2847        Mock::given(method("GET"))
2848            .and(path(
2849                "/api/v1/repositories/briefcase-test/branches/main/staging/backing",
2850            ))
2851            .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
2852            .expect(1)
2853            .mount(&mock_server)
2854            .await;
2855
2856        let result = backend.upload_object("test/file.json", b"data").await;
2857        assert!(result.is_err());
2858    }
2859
2860    #[tokio::test]
2861    async fn test_download_object_success() {
2862        let mock_server = MockServer::start().await;
2863        let config = create_test_config_with_endpoint(&mock_server.uri());
2864        let backend = LakeFSBackend::new(config).unwrap();
2865
2866        Mock::given(method("GET"))
2867            .and(path(
2868                "/api/v1/repositories/briefcase-test/refs/main/objects",
2869            ))
2870            .and(query_param("path", "test/file.json"))
2871            .respond_with(ResponseTemplate::new(200).set_body_bytes(b"file content here".to_vec()))
2872            .expect(1)
2873            .mount(&mock_server)
2874            .await;
2875
2876        let data = backend.download_object("test/file.json").await.unwrap();
2877        assert_eq!(data, b"file content here");
2878    }
2879
2880    #[tokio::test]
2881    async fn test_download_object_not_found() {
2882        let mock_server = MockServer::start().await;
2883        let config = create_test_config_with_endpoint(&mock_server.uri());
2884        let backend = LakeFSBackend::new(config).unwrap();
2885
2886        Mock::given(method("GET"))
2887            .and(path(
2888                "/api/v1/repositories/briefcase-test/refs/main/objects",
2889            ))
2890            .respond_with(ResponseTemplate::new(404))
2891            .expect(1)
2892            .mount(&mock_server)
2893            .await;
2894
2895        let result = backend.download_object("missing.json").await;
2896        assert!(matches!(result, Err(StorageError::NotFound(_))));
2897    }
2898
2899    #[tokio::test]
2900    async fn test_create_commit_success() {
2901        let mock_server = MockServer::start().await;
2902        let config = create_test_config_with_endpoint(&mock_server.uri());
2903        let backend = LakeFSBackend::new(config).unwrap();
2904
2905        Mock::given(method("POST"))
2906            .and(path(
2907                "/api/v1/repositories/briefcase-test/branches/main/commits",
2908            ))
2909            .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2910                "id": "commit-sha-xyz"
2911            })))
2912            .expect(1)
2913            .mount(&mock_server)
2914            .await;
2915
2916        let id = backend.create_commit("Test commit").await.unwrap();
2917        assert_eq!(id, "commit-sha-xyz");
2918    }
2919
2920    #[tokio::test]
2921    async fn test_list_objects_success() {
2922        let mock_server = MockServer::start().await;
2923        let config = create_test_config_with_endpoint(&mock_server.uri());
2924        let backend = LakeFSBackend::new(config).unwrap();
2925
2926        Mock::given(method("GET"))
2927            .and(path(
2928                "/api/v1/repositories/briefcase-test/refs/main/objects/ls",
2929            ))
2930            .and(query_param("prefix", "snapshots/"))
2931            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2932                "results": [
2933                    {"path": "snapshots/a.json", "type": "object"},
2934                    {"path": "snapshots/b.json", "type": "object"},
2935                    {"path": "snapshots/", "type": "common_prefix"}
2936                ]
2937            })))
2938            .expect(1)
2939            .mount(&mock_server)
2940            .await;
2941
2942        let paths = backend.list_objects("snapshots/").await.unwrap();
2943        assert_eq!(paths.len(), 2);
2944        assert!(paths.contains(&"snapshots/a.json".to_string()));
2945        assert!(paths.contains(&"snapshots/b.json".to_string()));
2946    }
2947
2948    #[tokio::test]
2949    async fn test_health_check_healthy() {
2950        let mock_server = MockServer::start().await;
2951        let config = create_test_config_with_endpoint(&mock_server.uri());
2952        let backend = LakeFSBackend::new(config).unwrap();
2953
2954        Mock::given(method("GET"))
2955            .and(path("/api/v1/repositories/briefcase-test"))
2956            .respond_with(ResponseTemplate::new(200))
2957            .expect(1)
2958            .mount(&mock_server)
2959            .await;
2960
2961        assert!(backend.health_check().await.unwrap());
2962    }
2963
2964    #[tokio::test]
2965    async fn test_health_check_unhealthy() {
2966        let mock_server = MockServer::start().await;
2967        let config = create_test_config_with_endpoint(&mock_server.uri());
2968        let backend = LakeFSBackend::new(config).unwrap();
2969
2970        Mock::given(method("GET"))
2971            .and(path("/api/v1/repositories/briefcase-test"))
2972            .respond_with(ResponseTemplate::new(503))
2973            .expect(1)
2974            .mount(&mock_server)
2975            .await;
2976
2977        assert!(!backend.health_check().await.unwrap());
2978    }
2979
2980    #[tokio::test]
2981    async fn test_delete_via_trait_success() {
2982        // delete() now does stat-then-delete because lakeFS DELETE is idempotent
2983        // (always 204, can't distinguish found-and-deleted from already-absent).
2984        let mock_server = MockServer::start().await;
2985        let config = create_test_config_with_endpoint(&mock_server.uri());
2986        let backend = LakeFSBackend::new(config).unwrap();
2987
2988        // Step 1: stat returns 200 (object exists).
2989        Mock::given(method("GET"))
2990            .and(path(
2991                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2992            ))
2993            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
2994                "path": "snapshots/some-snapshot-id.json",
2995                "physical_address": "s3://bucket/some-snapshot-id",
2996                "size_bytes": 512,
2997                "content_type": "application/json",
2998                "mtime": 1700000000,
2999                "checksum": "abc123"
3000            })))
3001            .expect(1)
3002            .mount(&mock_server)
3003            .await;
3004
3005        // Step 2: delete returns 204.
3006        Mock::given(method("DELETE"))
3007            .and(path(
3008                "/api/v1/repositories/briefcase-test/branches/main/objects",
3009            ))
3010            .respond_with(ResponseTemplate::new(204))
3011            .expect(1)
3012            .mount(&mock_server)
3013            .await;
3014
3015        let deleted = backend.delete("some-snapshot-id").await.unwrap();
3016        assert!(deleted);
3017    }
3018
3019    #[tokio::test]
3020    async fn test_delete_via_trait_not_found() {
3021        let mock_server = MockServer::start().await;
3022        let config = create_test_config_with_endpoint(&mock_server.uri());
3023        let backend = LakeFSBackend::new(config).unwrap();
3024
3025        // stat returns 404 → delete() short-circuits and returns Ok(false).
3026        Mock::given(method("GET"))
3027            .and(path(
3028                "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
3029            ))
3030            .respond_with(ResponseTemplate::new(404))
3031            .expect(1)
3032            .mount(&mock_server)
3033            .await;
3034
3035        // No DELETE call should be made.
3036        let deleted = backend.delete("nonexistent").await.unwrap();
3037        assert!(!deleted);
3038    }
3039
3040    // Note: Integration tests against real LakeFS would require a running instance
3041    // These would be better suited for a separate integration test suite
3042
3043    // ─── Auth provider tests ────────────────────────────────────────────────
3044
3045    #[test]
3046    fn test_lakefs_auth_enum_basic() {
3047        let auth = LakeFSAuth::Basic {
3048            access_key: "ak".into(),
3049            secret_key: "sk".into(),
3050        };
3051        match auth {
3052            LakeFSAuth::Basic {
3053                access_key,
3054                secret_key,
3055            } => {
3056                assert_eq!(access_key, "ak");
3057                assert_eq!(secret_key, "sk");
3058            }
3059            _ => panic!("Expected Basic"),
3060        }
3061    }
3062
3063    #[test]
3064    fn test_lakefs_auth_enum_all_variants() {
3065        // Ensure all 6 variants construct without panic.
3066        let _basic = LakeFSAuth::Basic {
3067            access_key: "a".into(),
3068            secret_key: "s".into(),
3069        };
3070        let _token = LakeFSAuth::Token {
3071            access_key: "a".into(),
3072            secret_key: "s".into(),
3073        };
3074        let _sts = LakeFSAuth::Sts {
3075            oidc_token: "tok".into(),
3076        };
3077        let _iam = LakeFSAuth::Iam;
3078        let _oidc = LakeFSAuth::Oidc {
3079            token: "tok".into(),
3080        };
3081        let _saml = LakeFSAuth::Saml {
3082            assertion: "xml".into(),
3083        };
3084    }
3085
3086    #[test]
3087    fn test_lakefs_config_with_auth() {
3088        let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk").with_auth(
3089            LakeFSAuth::Token {
3090                access_key: "ak2".into(),
3091                secret_key: "sk2".into(),
3092            },
3093        );
3094        assert!(config.auth.is_some());
3095        match config.auth.unwrap() {
3096            LakeFSAuth::Token {
3097                access_key,
3098                secret_key,
3099            } => {
3100                assert_eq!(access_key, "ak2");
3101                assert_eq!(secret_key, "sk2");
3102            }
3103            _ => panic!("Expected Token"),
3104        }
3105    }
3106
3107    #[test]
3108    fn test_lakefs_config_without_auth_defaults_to_none() {
3109        let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk");
3110        assert!(config.auth.is_none());
3111    }
3112
3113    #[tokio::test]
3114    async fn test_basic_auth_provider_header() {
3115        let provider = BasicAuthProvider::new(
3116            "AKIAIOSFODNN7EXAMPLE",
3117            "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
3118        )
3119        .unwrap();
3120        let header = provider.auth_header().await.unwrap();
3121        let header_str = header.to_str().unwrap();
3122        assert!(header_str.starts_with("Basic "));
3123        // Verify it's valid base64
3124        let b64_part = &header_str["Basic ".len()..];
3125        let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3126        let decoded_str = String::from_utf8(decoded).unwrap();
3127        assert_eq!(
3128            decoded_str,
3129            "AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
3130        );
3131    }
3132
3133    #[tokio::test]
3134    async fn test_basic_auth_provider_mode_name() {
3135        let provider = BasicAuthProvider::new("ak", "sk").unwrap();
3136        assert_eq!(provider.mode_name(), "basic");
3137    }
3138
3139    #[tokio::test]
3140    async fn test_token_auth_provider_login_success() {
3141        let mock_server = MockServer::start().await;
3142
3143        // Mock the login endpoint
3144        Mock::given(method("POST"))
3145            .and(path("/api/v1/auth/login"))
3146            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3147                "token": "eyJhbGciOiJIUzI1NiJ9.test",
3148                "token_expiration": 9999999999u64
3149            })))
3150            .expect(1)
3151            .mount(&mock_server)
3152            .await;
3153
3154        let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3155        let header = provider.auth_header().await.unwrap();
3156        let header_str = header.to_str().unwrap();
3157        assert_eq!(header_str, "Bearer eyJhbGciOiJIUzI1NiJ9.test");
3158        assert_eq!(provider.mode_name(), "token");
3159    }
3160
3161    #[tokio::test]
3162    async fn test_token_auth_provider_caches_token() {
3163        let mock_server = MockServer::start().await;
3164
3165        Mock::given(method("POST"))
3166            .and(path("/api/v1/auth/login"))
3167            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3168                "token": "cached-token",
3169                "token_expiration": 9999999999u64
3170            })))
3171            .expect(1)  // Should only be called once
3172            .mount(&mock_server)
3173            .await;
3174
3175        let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3176
3177        // First call — fetches token
3178        let h1 = provider.auth_header().await.unwrap();
3179        // Second call — cache hit
3180        let h2 = provider.auth_header().await.unwrap();
3181        assert_eq!(h1, h2);
3182    }
3183
3184    #[tokio::test]
3185    async fn test_token_auth_provider_login_failure() {
3186        let mock_server = MockServer::start().await;
3187
3188        Mock::given(method("POST"))
3189            .and(path("/api/v1/auth/login"))
3190            .respond_with(ResponseTemplate::new(401).set_body_json(json!({
3191                "message": "invalid credentials"
3192            })))
3193            .expect(1)
3194            .mount(&mock_server)
3195            .await;
3196
3197        let provider = TokenAuthProvider::new(&mock_server.uri(), "bad", "creds");
3198        let result = provider.auth_header().await;
3199        assert!(result.is_err());
3200        let err_msg = format!("{}", result.unwrap_err());
3201        assert!(err_msg.contains("401"));
3202    }
3203
3204    #[tokio::test]
3205    async fn test_sts_auth_provider_success() {
3206        let mock_server = MockServer::start().await;
3207
3208        Mock::given(method("POST"))
3209            .and(path("/sts/login"))
3210            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3211                "Credentials": {
3212                    "AccessKeyId": "ASIATEMP",
3213                    "SecretAccessKey": "tempSecret",
3214                    "SessionToken": "FwoGZXIvY...",
3215                    "Expiration": 9999999999u64
3216                }
3217            })))
3218            .expect(1)
3219            .mount(&mock_server)
3220            .await;
3221
3222        let provider = StsAuthProvider::new(&mock_server.uri(), "my-oidc-token");
3223        let header = provider.auth_header().await.unwrap();
3224        let header_str = header.to_str().unwrap();
3225        assert!(header_str.starts_with("Basic "));
3226        // Decode and verify it uses the temp credentials
3227        let b64_part = &header_str["Basic ".len()..];
3228        let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3229        let decoded_str = String::from_utf8(decoded).unwrap();
3230        assert_eq!(decoded_str, "ASIATEMP:tempSecret");
3231        assert_eq!(provider.mode_name(), "sts");
3232    }
3233
3234    #[tokio::test]
3235    async fn test_oidc_auth_provider_success() {
3236        let mock_server = MockServer::start().await;
3237
3238        Mock::given(method("POST"))
3239            .and(path("/api/v1/oidc/login"))
3240            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3241                "token": "oidc-lakefs-jwt-token",
3242                "token_expiration": 9999999999u64
3243            })))
3244            .expect(1)
3245            .mount(&mock_server)
3246            .await;
3247
3248        let provider = OidcAuthProvider::new(&mock_server.uri(), "external-oidc-token");
3249        let header = provider.auth_header().await.unwrap();
3250        let header_str = header.to_str().unwrap();
3251        assert_eq!(header_str, "Bearer oidc-lakefs-jwt-token");
3252        assert_eq!(provider.mode_name(), "oidc");
3253    }
3254
3255    #[tokio::test]
3256    async fn test_saml_auth_provider_success() {
3257        let mock_server = MockServer::start().await;
3258
3259        Mock::given(method("POST"))
3260            .and(path("/api/v1/auth/external/saml"))
3261            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3262                "token": "saml-lakefs-jwt-token",
3263                "token_expiration": 9999999999u64
3264            })))
3265            .expect(1)
3266            .mount(&mock_server)
3267            .await;
3268
3269        let provider = SamlAuthProvider::new(&mock_server.uri(), "PHNhbWxSZXNwb25zZT4=");
3270        let header = provider.auth_header().await.unwrap();
3271        let header_str = header.to_str().unwrap();
3272        assert_eq!(header_str, "Bearer saml-lakefs-jwt-token");
3273        assert_eq!(provider.mode_name(), "saml");
3274    }
3275
3276    #[tokio::test]
3277    async fn test_iam_auth_provider_mode_name() {
3278        let provider = IamAuthProvider::new();
3279        assert_eq!(provider.mode_name(), "iam");
3280    }
3281
3282    #[tokio::test]
3283    async fn test_build_auth_provider_basic() {
3284        let auth = LakeFSAuth::Basic {
3285            access_key: "ak".into(),
3286            secret_key: "sk".into(),
3287        };
3288        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3289        assert_eq!(provider.mode_name(), "basic");
3290        let header = provider.auth_header().await.unwrap();
3291        assert!(header.to_str().unwrap().starts_with("Basic "));
3292    }
3293
3294    #[tokio::test]
3295    async fn test_build_auth_provider_token() {
3296        let auth = LakeFSAuth::Token {
3297            access_key: "ak".into(),
3298            secret_key: "sk".into(),
3299        };
3300        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3301        assert_eq!(provider.mode_name(), "token");
3302    }
3303
3304    #[tokio::test]
3305    async fn test_build_auth_provider_iam() {
3306        let auth = LakeFSAuth::Iam;
3307        let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3308        assert_eq!(provider.mode_name(), "iam");
3309    }
3310
3311    #[tokio::test]
3312    async fn test_backend_with_basic_auth_config() {
3313        let mock_server = MockServer::start().await;
3314
3315        let config = LakeFSConfig::new(
3316            mock_server.uri(),
3317            "test-repo",
3318            "main",
3319            "test_key",
3320            "test_secret",
3321        )
3322        .with_auth(LakeFSAuth::Basic {
3323            access_key: "test_key".into(),
3324            secret_key: "test_secret".into(),
3325        });
3326
3327        let backend = LakeFSBackend::new(config).unwrap();
3328        assert_eq!(backend.auth_mode(), "basic");
3329    }
3330
3331    #[tokio::test]
3332    async fn test_backend_with_token_auth_sends_bearer() {
3333        let mock_server = MockServer::start().await;
3334
3335        // Mock login endpoint
3336        Mock::given(method("POST"))
3337            .and(path("/api/v1/auth/login"))
3338            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3339                "token": "test-jwt-token",
3340                "token_expiration": 9999999999u64
3341            })))
3342            .mount(&mock_server)
3343            .await;
3344
3345        // Mock repository endpoint (health check target)
3346        Mock::given(method("GET"))
3347            .and(path("/api/v1/repositories/test-repo"))
3348            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3349                "id": "test-repo",
3350                "storage_namespace": "s3://test",
3351                "default_branch": "main"
3352            })))
3353            .mount(&mock_server)
3354            .await;
3355
3356        let config = LakeFSConfig::new(mock_server.uri(), "test-repo", "main", "ak", "sk")
3357            .with_auth(LakeFSAuth::Token {
3358                access_key: "ak".into(),
3359                secret_key: "sk".into(),
3360            });
3361
3362        let backend = LakeFSBackend::new(config).unwrap();
3363        assert_eq!(backend.auth_mode(), "token");
3364
3365        // Verify it can make an authenticated request
3366        let health = backend.health_check().await.unwrap();
3367        assert!(health);
3368    }
3369
3370    #[tokio::test]
3371    async fn test_backend_default_auth_is_basic() {
3372        let config = LakeFSConfig::new("http://localhost:8000", "test-repo", "main", "ak", "sk");
3373        // No .with_auth() → should default to Basic
3374        let backend = LakeFSBackend::new(config).unwrap();
3375        assert_eq!(backend.auth_mode(), "basic");
3376    }
3377}
3378
3379// ── Integration tests ─────────────────────────────────────────────────────────
3380// Run with: cargo test -p briefcase-core --no-default-features \
3381//   --features "async,storage,networking" integration_tests -- --ignored
3382#[cfg(all(feature = "async", feature = "networking", test))]
3383#[path = "lakefs_integration_tests.rs"]
3384mod integration_tests;