1use super::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
2use crate::models::{DecisionSnapshot, Snapshot};
3#[cfg(feature = "networking")]
4use base64::{engine::general_purpose, Engine as _};
5#[cfg(feature = "networking")]
6use reqwest::{
7 header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
8 Client, RequestBuilder, Response,
9};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14#[derive(Debug, Clone)]
25pub enum LakeFSAuth {
26 Basic {
29 access_key: String,
30 secret_key: String,
31 },
32
33 Token {
37 access_key: String,
38 secret_key: String,
39 },
40
41 Sts {
45 oidc_token: String,
47 },
48
49 Iam,
53
54 Oidc {
58 token: String,
60 },
61
62 Saml {
66 assertion: String,
68 },
69}
70
71#[cfg(all(feature = "async", feature = "networking"))]
79#[async_trait::async_trait]
80pub(crate) trait AuthProvider: Send + Sync {
81 async fn auth_header(&self) -> Result<HeaderValue, StorageError>;
86
87 fn mode_name(&self) -> &'static str;
89}
90
91#[cfg(all(feature = "async", feature = "networking"))]
94pub(crate) struct BasicAuthProvider {
95 header: HeaderValue,
96}
97
98#[cfg(all(feature = "async", feature = "networking"))]
99impl BasicAuthProvider {
100 pub fn new(access_key: &str, secret_key: &str) -> Result<Self, StorageError> {
101 let credentials = format!("{}:{}", access_key, secret_key);
102 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
103 let header_str = format!("Basic {}", encoded);
104 let header = HeaderValue::from_str(&header_str).map_err(|e| {
105 StorageError::ConnectionError(format!("Invalid Basic auth header: {}", e))
106 })?;
107 Ok(Self { header })
108 }
109}
110
111#[cfg(all(feature = "async", feature = "networking"))]
112#[async_trait::async_trait]
113impl AuthProvider for BasicAuthProvider {
114 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
115 Ok(self.header.clone())
116 }
117 fn mode_name(&self) -> &'static str {
118 "basic"
119 }
120}
121
122#[cfg(all(feature = "async", feature = "networking"))]
125pub(crate) struct TokenAuthProvider {
126 endpoint: String,
127 access_key: String,
128 secret_key: String,
129 client: Client,
130 cache: Mutex<Option<(String, u64)>>,
132}
133
134#[cfg(all(feature = "async", feature = "networking"))]
135impl TokenAuthProvider {
136 pub fn new(endpoint: &str, access_key: &str, secret_key: &str) -> Self {
137 let client = Client::builder()
138 .timeout(std::time::Duration::from_secs(10))
139 .build()
140 .unwrap_or_default();
141 Self {
142 endpoint: endpoint.trim_end_matches('/').to_string(),
143 access_key: access_key.to_string(),
144 secret_key: secret_key.to_string(),
145 client,
146 cache: Mutex::new(None),
147 }
148 }
149
150 async fn fetch_token(&self) -> Result<(String, u64), StorageError> {
152 #[derive(Serialize)]
153 struct LoginRequest {
154 access_key_id: String,
155 secret_access_key: String,
156 }
157
158 #[derive(Deserialize)]
159 struct LoginResponse {
160 token: String,
161 #[serde(default)]
162 token_expiration: Option<u64>,
163 }
164
165 let url = format!("{}/api/v1/auth/login", self.endpoint);
166 let body = LoginRequest {
167 access_key_id: self.access_key.clone(),
168 secret_access_key: self.secret_key.clone(),
169 };
170
171 let resp = self
172 .client
173 .post(&url)
174 .json(&body)
175 .send()
176 .await
177 .map_err(|e| StorageError::ConnectionError(format!("Token login failed: {}", e)))?;
178
179 if !resp.status().is_success() {
180 let status = resp.status();
181 let text = resp.text().await.unwrap_or_default();
182 return Err(StorageError::ConnectionError(format!(
183 "Token login returned {}: {}",
184 status, text
185 )));
186 }
187
188 let login: LoginResponse = resp.json().await.map_err(|e| {
189 StorageError::ConnectionError(format!("Failed to parse login response: {}", e))
190 })?;
191
192 let now_secs = std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_secs();
197 let expiry = login.token_expiration.unwrap_or(now_secs + 3600);
198
199 Ok((login.token, expiry))
200 }
201}
202
203#[cfg(all(feature = "async", feature = "networking"))]
204#[async_trait::async_trait]
205impl AuthProvider for TokenAuthProvider {
206 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
207 let needs_refresh = {
209 let cache = self.cache.lock().unwrap();
210 match &*cache {
211 Some((_token, expiry)) => {
212 let now = std::time::SystemTime::now()
213 .duration_since(std::time::UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_secs();
216 now + 60 >= *expiry
217 }
218 None => true,
219 }
220 };
221
222 if needs_refresh {
223 let (token, expiry) = self.fetch_token().await?;
224 let mut cache = self.cache.lock().unwrap();
225 *cache = Some((token, expiry));
226 }
227
228 let cache = self.cache.lock().unwrap();
229 let (token, _) = cache.as_ref().unwrap();
230 let header_str = format!("Bearer {}", token);
231 HeaderValue::from_str(&header_str)
232 .map_err(|e| StorageError::ConnectionError(format!("Invalid Bearer header: {}", e)))
233 }
234
235 fn mode_name(&self) -> &'static str {
236 "token"
237 }
238}
239
240#[cfg(all(feature = "async", feature = "networking"))]
243pub(crate) struct StsAuthProvider {
244 endpoint: String,
245 oidc_token: String,
246 client: Client,
247 cache: Mutex<Option<(String, String, String, u64)>>,
249}
250
251#[cfg(all(feature = "async", feature = "networking"))]
252impl StsAuthProvider {
253 pub fn new(endpoint: &str, oidc_token: &str) -> Self {
254 let client = Client::builder()
255 .timeout(std::time::Duration::from_secs(10))
256 .build()
257 .unwrap_or_default();
258 Self {
259 endpoint: endpoint.trim_end_matches('/').to_string(),
260 oidc_token: oidc_token.to_string(),
261 client,
262 cache: Mutex::new(None),
263 }
264 }
265
266 async fn fetch_temp_credentials(&self) -> Result<(String, String, String, u64), StorageError> {
267 #[derive(Deserialize)]
268 struct StsResponse {
269 #[serde(rename = "Credentials")]
270 credentials: StsCredentials,
271 }
272
273 #[derive(Deserialize)]
274 struct StsCredentials {
275 #[serde(rename = "AccessKeyId")]
276 access_key_id: String,
277 #[serde(rename = "SecretAccessKey")]
278 secret_access_key: String,
279 #[serde(rename = "SessionToken")]
280 session_token: String,
281 #[serde(rename = "Expiration", default)]
282 expiration: Option<u64>,
283 }
284
285 let url = format!("{}/sts/login", self.endpoint);
286
287 let resp = self
288 .client
289 .post(&url)
290 .header("Content-Type", "application/x-www-form-urlencoded")
291 .body(format!(
292 "Action=AssumeRoleWithWebIdentity&WebIdentityToken={}&Version=2011-06-15",
293 self.oidc_token
294 ))
295 .send()
296 .await
297 .map_err(|e| StorageError::ConnectionError(format!("STS login failed: {}", e)))?;
298
299 if !resp.status().is_success() {
300 let status = resp.status();
301 let text = resp.text().await.unwrap_or_default();
302 return Err(StorageError::ConnectionError(format!(
303 "STS login returned {}: {}",
304 status, text
305 )));
306 }
307
308 let sts: StsResponse = resp.json().await.map_err(|e| {
309 StorageError::ConnectionError(format!("Failed to parse STS response: {}", e))
310 })?;
311
312 let now_secs = std::time::SystemTime::now()
313 .duration_since(std::time::UNIX_EPOCH)
314 .unwrap_or_default()
315 .as_secs();
316 let expiry = sts.credentials.expiration.unwrap_or(now_secs + 3600);
317
318 Ok((
319 sts.credentials.access_key_id,
320 sts.credentials.secret_access_key,
321 sts.credentials.session_token,
322 expiry,
323 ))
324 }
325}
326
327#[cfg(all(feature = "async", feature = "networking"))]
328#[async_trait::async_trait]
329impl AuthProvider for StsAuthProvider {
330 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
331 let needs_refresh = {
332 let cache = self.cache.lock().unwrap();
333 match &*cache {
334 Some((_, _, _, expiry)) => {
335 let now = std::time::SystemTime::now()
336 .duration_since(std::time::UNIX_EPOCH)
337 .unwrap_or_default()
338 .as_secs();
339 now + 60 >= *expiry
340 }
341 None => true,
342 }
343 };
344
345 if needs_refresh {
346 let creds = self.fetch_temp_credentials().await?;
347 let mut cache = self.cache.lock().unwrap();
348 *cache = Some(creds);
349 }
350
351 let cache = self.cache.lock().unwrap();
355 let (ak, sk, _session_token, _) = cache.as_ref().unwrap();
356 let credentials = format!("{}:{}", ak, sk);
357 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
358 let header_str = format!("Basic {}", encoded);
359 HeaderValue::from_str(&header_str)
360 .map_err(|e| StorageError::ConnectionError(format!("Invalid STS auth header: {}", e)))
361 }
362
363 fn mode_name(&self) -> &'static str {
364 "sts"
365 }
366}
367
368#[cfg(all(feature = "async", feature = "networking"))]
371pub(crate) struct IamAuthProvider {
372 client: Client,
373 cache: Mutex<Option<IamCredentialCache>>,
375}
376
377#[cfg(all(feature = "async", feature = "networking"))]
378type IamCredentialCache = (String, String, Option<String>, u64);
379
380#[cfg(all(feature = "async", feature = "networking"))]
381impl IamAuthProvider {
382 pub fn new() -> Self {
383 let client = Client::builder()
384 .timeout(std::time::Duration::from_secs(5))
385 .build()
386 .unwrap_or_default();
387 Self {
388 client,
389 cache: Mutex::new(None),
390 }
391 }
392
393 async fn fetch_credentials(&self) -> Result<IamCredentialCache, StorageError> {
395 if let Ok(creds_uri) = std::env::var("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") {
397 let url = format!("http://169.254.170.2{}", creds_uri);
398 if let Ok(resp) = self.client.get(&url).send().await {
399 if resp.status().is_success() {
400 if let Ok(creds) = resp.json::<IamCredentials>().await {
401 let now_secs = std::time::SystemTime::now()
402 .duration_since(std::time::UNIX_EPOCH)
403 .unwrap_or_default()
404 .as_secs();
405 return Ok((
406 creds.access_key_id,
407 creds.secret_access_key,
408 creds.token,
409 now_secs + 3600,
410 ));
411 }
412 }
413 }
414 }
415
416 let token_resp = self
418 .client
419 .put("http://169.254.169.254/latest/api/token")
420 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421 .send()
422 .await
423 .map_err(|e| {
424 StorageError::ConnectionError(format!("IAM: Failed to fetch IMDS token: {}", e))
425 })?;
426
427 let imds_token = token_resp.text().await.map_err(|e| {
428 StorageError::ConnectionError(format!("IAM: Failed to read IMDS token: {}", e))
429 })?;
430
431 let role_resp = self
433 .client
434 .get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")
435 .header("X-aws-ec2-metadata-token", &imds_token)
436 .send()
437 .await
438 .map_err(|e| {
439 StorageError::ConnectionError(format!("IAM: Failed to fetch role name: {}", e))
440 })?;
441
442 let role_name = role_resp.text().await.map_err(|e| {
443 StorageError::ConnectionError(format!("IAM: Failed to read role name: {}", e))
444 })?;
445 let role_name = role_name.trim();
446
447 let creds_url = format!(
449 "http://169.254.169.254/latest/meta-data/iam/security-credentials/{}",
450 role_name
451 );
452 let creds_resp = self
453 .client
454 .get(&creds_url)
455 .header("X-aws-ec2-metadata-token", &imds_token)
456 .send()
457 .await
458 .map_err(|e| {
459 StorageError::ConnectionError(format!("IAM: Failed to fetch credentials: {}", e))
460 })?;
461
462 let creds: IamCredentials = creds_resp.json().await.map_err(|e| {
463 StorageError::ConnectionError(format!("IAM: Failed to parse credentials: {}", e))
464 })?;
465
466 let now_secs = std::time::SystemTime::now()
467 .duration_since(std::time::UNIX_EPOCH)
468 .unwrap_or_default()
469 .as_secs();
470
471 Ok((
472 creds.access_key_id,
473 creds.secret_access_key,
474 creds.token,
475 now_secs + 3600,
476 ))
477 }
478}
479
480#[cfg(all(feature = "async", feature = "networking"))]
481#[derive(Deserialize)]
482struct IamCredentials {
483 #[serde(rename = "AccessKeyId", alias = "accessKeyId")]
484 access_key_id: String,
485 #[serde(rename = "SecretAccessKey", alias = "secretAccessKey")]
486 secret_access_key: String,
487 #[serde(rename = "Token", alias = "token", alias = "SessionToken", default)]
488 token: Option<String>,
489}
490
491#[cfg(all(feature = "async", feature = "networking"))]
492#[async_trait::async_trait]
493impl AuthProvider for IamAuthProvider {
494 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
495 let needs_refresh = {
496 let cache = self.cache.lock().unwrap();
497 match &*cache {
498 Some((_, _, _, expiry)) => {
499 let now = std::time::SystemTime::now()
500 .duration_since(std::time::UNIX_EPOCH)
501 .unwrap_or_default()
502 .as_secs();
503 now + 60 >= *expiry
504 }
505 None => true,
506 }
507 };
508
509 if needs_refresh {
510 let creds = self.fetch_credentials().await?;
511 let mut cache = self.cache.lock().unwrap();
512 *cache = Some(creds);
513 }
514
515 let cache = self.cache.lock().unwrap();
516 let (ak, sk, _, _) = cache.as_ref().unwrap();
517 let credentials = format!("{}:{}", ak, sk);
518 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
519 let header_str = format!("Basic {}", encoded);
520 HeaderValue::from_str(&header_str)
521 .map_err(|e| StorageError::ConnectionError(format!("Invalid IAM auth header: {}", e)))
522 }
523
524 fn mode_name(&self) -> &'static str {
525 "iam"
526 }
527}
528
529#[cfg(all(feature = "async", feature = "networking"))]
532pub(crate) struct OidcAuthProvider {
533 endpoint: String,
534 external_token: String,
535 client: Client,
536 cache: Mutex<Option<(String, u64)>>,
538}
539
540#[cfg(all(feature = "async", feature = "networking"))]
541impl OidcAuthProvider {
542 pub fn new(endpoint: &str, external_token: &str) -> Self {
543 let client = Client::builder()
544 .timeout(std::time::Duration::from_secs(10))
545 .build()
546 .unwrap_or_default();
547 Self {
548 endpoint: endpoint.trim_end_matches('/').to_string(),
549 external_token: external_token.to_string(),
550 client,
551 cache: Mutex::new(None),
552 }
553 }
554
555 async fn exchange_token(&self) -> Result<(String, u64), StorageError> {
556 #[derive(Deserialize)]
557 struct OidcResponse {
558 token: String,
559 #[serde(default)]
560 token_expiration: Option<u64>,
561 }
562
563 let url = format!("{}/api/v1/oidc/login", self.endpoint);
564
565 let resp = self
566 .client
567 .post(&url)
568 .header("Content-Type", "application/x-www-form-urlencoded")
569 .body(format!("code={}", self.external_token))
570 .send()
571 .await
572 .map_err(|e| StorageError::ConnectionError(format!("OIDC login failed: {}", e)))?;
573
574 if !resp.status().is_success() {
575 let status = resp.status();
576 let text = resp.text().await.unwrap_or_default();
577 return Err(StorageError::ConnectionError(format!(
578 "OIDC login returned {}: {}",
579 status, text
580 )));
581 }
582
583 let oidc: OidcResponse = resp.json().await.map_err(|e| {
584 StorageError::ConnectionError(format!("Failed to parse OIDC response: {}", e))
585 })?;
586
587 let now_secs = std::time::SystemTime::now()
588 .duration_since(std::time::UNIX_EPOCH)
589 .unwrap_or_default()
590 .as_secs();
591 let expiry = oidc.token_expiration.unwrap_or(now_secs + 3600);
592
593 Ok((oidc.token, expiry))
594 }
595}
596
597#[cfg(all(feature = "async", feature = "networking"))]
598#[async_trait::async_trait]
599impl AuthProvider for OidcAuthProvider {
600 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
601 let needs_refresh = {
602 let cache = self.cache.lock().unwrap();
603 match &*cache {
604 Some((_token, expiry)) => {
605 let now = std::time::SystemTime::now()
606 .duration_since(std::time::UNIX_EPOCH)
607 .unwrap_or_default()
608 .as_secs();
609 now + 60 >= *expiry
610 }
611 None => true,
612 }
613 };
614
615 if needs_refresh {
616 let (token, expiry) = self.exchange_token().await?;
617 let mut cache = self.cache.lock().unwrap();
618 *cache = Some((token, expiry));
619 }
620
621 let cache = self.cache.lock().unwrap();
622 let (token, _) = cache.as_ref().unwrap();
623 let header_str = format!("Bearer {}", token);
624 HeaderValue::from_str(&header_str).map_err(|e| {
625 StorageError::ConnectionError(format!("Invalid OIDC Bearer header: {}", e))
626 })
627 }
628
629 fn mode_name(&self) -> &'static str {
630 "oidc"
631 }
632}
633
634#[cfg(all(feature = "async", feature = "networking"))]
637pub(crate) struct SamlAuthProvider {
638 endpoint: String,
639 assertion: String,
640 client: Client,
641 cache: Mutex<Option<(String, u64)>>,
643}
644
645#[cfg(all(feature = "async", feature = "networking"))]
646impl SamlAuthProvider {
647 pub fn new(endpoint: &str, assertion: &str) -> Self {
648 let client = Client::builder()
649 .timeout(std::time::Duration::from_secs(10))
650 .build()
651 .unwrap_or_default();
652 Self {
653 endpoint: endpoint.trim_end_matches('/').to_string(),
654 assertion: assertion.to_string(),
655 client,
656 cache: Mutex::new(None),
657 }
658 }
659
660 async fn exchange_assertion(&self) -> Result<(String, u64), StorageError> {
661 #[derive(Deserialize)]
662 struct SamlResponse {
663 token: String,
664 #[serde(default)]
665 token_expiration: Option<u64>,
666 }
667
668 let url = format!("{}/api/v1/auth/external/saml", self.endpoint);
669
670 let resp = self
671 .client
672 .post(&url)
673 .header("Content-Type", "application/x-www-form-urlencoded")
674 .body(format!("SAMLResponse={}", self.assertion))
675 .send()
676 .await
677 .map_err(|e| StorageError::ConnectionError(format!("SAML login failed: {}", e)))?;
678
679 if !resp.status().is_success() {
680 let status = resp.status();
681 let text = resp.text().await.unwrap_or_default();
682 return Err(StorageError::ConnectionError(format!(
683 "SAML login returned {}: {}",
684 status, text
685 )));
686 }
687
688 let saml: SamlResponse = resp.json().await.map_err(|e| {
689 StorageError::ConnectionError(format!("Failed to parse SAML response: {}", e))
690 })?;
691
692 let now_secs = std::time::SystemTime::now()
693 .duration_since(std::time::UNIX_EPOCH)
694 .unwrap_or_default()
695 .as_secs();
696 let expiry = saml.token_expiration.unwrap_or(now_secs + 3600);
697
698 Ok((saml.token, expiry))
699 }
700}
701
702#[cfg(all(feature = "async", feature = "networking"))]
703#[async_trait::async_trait]
704impl AuthProvider for SamlAuthProvider {
705 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
706 let needs_refresh = {
707 let cache = self.cache.lock().unwrap();
708 match &*cache {
709 Some((_token, expiry)) => {
710 let now = std::time::SystemTime::now()
711 .duration_since(std::time::UNIX_EPOCH)
712 .unwrap_or_default()
713 .as_secs();
714 now + 60 >= *expiry
715 }
716 None => true,
717 }
718 };
719
720 if needs_refresh {
721 let (token, expiry) = self.exchange_assertion().await?;
722 let mut cache = self.cache.lock().unwrap();
723 *cache = Some((token, expiry));
724 }
725
726 let cache = self.cache.lock().unwrap();
727 let (token, _) = cache.as_ref().unwrap();
728 let header_str = format!("Bearer {}", token);
729 HeaderValue::from_str(&header_str).map_err(|e| {
730 StorageError::ConnectionError(format!("Invalid SAML Bearer header: {}", e))
731 })
732 }
733
734 fn mode_name(&self) -> &'static str {
735 "saml"
736 }
737}
738
739#[cfg(all(feature = "async", feature = "networking"))]
742fn build_auth_provider(
743 auth: &LakeFSAuth,
744 endpoint: &str,
745) -> Result<Arc<dyn AuthProvider>, StorageError> {
746 match auth {
747 LakeFSAuth::Basic {
748 access_key,
749 secret_key,
750 } => Ok(Arc::new(BasicAuthProvider::new(access_key, secret_key)?)),
751 LakeFSAuth::Token {
752 access_key,
753 secret_key,
754 } => Ok(Arc::new(TokenAuthProvider::new(
755 endpoint, access_key, secret_key,
756 ))),
757 LakeFSAuth::Sts { oidc_token } => Ok(Arc::new(StsAuthProvider::new(endpoint, oidc_token))),
758 LakeFSAuth::Iam => Ok(Arc::new(IamAuthProvider::new())),
759 LakeFSAuth::Oidc { token } => Ok(Arc::new(OidcAuthProvider::new(endpoint, token))),
760 LakeFSAuth::Saml { assertion } => Ok(Arc::new(SamlAuthProvider::new(endpoint, assertion))),
761 }
762}
763
764#[derive(Debug, Clone, Serialize, Deserialize)]
768pub struct MergeResult {
769 pub commit_id: String,
771 pub summary: String,
773}
774
775#[derive(Debug, Clone, Serialize, Deserialize)]
777pub struct CommitInfo {
778 pub id: String,
779 pub message: String,
780 pub committer: String,
781 #[serde(default)]
782 pub metadata: HashMap<String, String>,
783 #[serde(default)]
785 pub creation_date: i64,
786}
787
788#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct DiffEntry {
791 pub path: String,
793 #[serde(rename = "type")]
795 pub diff_type: String,
796 #[serde(default)]
798 pub size_bytes: Option<u64>,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize)]
803pub struct ObjectStats {
804 pub path: String,
805 #[serde(default)]
806 pub physical_address: String,
807 pub size_bytes: u64,
808 #[serde(default)]
809 pub content_type: String,
810 #[serde(default)]
812 pub mtime: i64,
813 #[serde(default)]
814 pub checksum: String,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct LakeFSAction {
820 pub name: String,
821 pub on: Vec<String>,
823 pub hooks: Vec<ActionHook>,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize)]
828pub struct ActionHook {
829 pub id: String,
830 #[serde(rename = "type")]
831 pub hook_type: String,
832 pub description: String,
833 #[serde(default)]
834 pub properties: HashMap<String, String>,
835}
836
837#[derive(Debug, Clone, Serialize, Deserialize)]
839pub struct RepositoryInfo {
840 pub id: String,
841 pub storage_namespace: String,
842 pub default_branch: String,
843 #[serde(default)]
844 pub creation_date: i64,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct BranchInfo {
850 pub id: String,
851 pub commit_id: String,
852}
853
854#[cfg(all(feature = "async", feature = "networking"))]
857#[derive(Clone)]
858pub struct LakeFSBackend {
859 client: Client,
860 endpoint: String,
861 repository: String, branch: String, #[allow(dead_code)]
864 access_key: String,
865 #[allow(dead_code)]
866 secret_key: String,
867 auth_provider: Arc<dyn AuthProvider>,
870 pending_writes: Arc<Mutex<Vec<Snapshot>>>,
871}
872
873#[cfg(all(feature = "async", feature = "networking"))]
874impl LakeFSBackend {
875 pub fn new(config: LakeFSConfig) -> Result<Self, StorageError> {
876 let endpoint = config.endpoint.trim_end_matches('/').to_string();
877
878 let auth = config.auth.clone().unwrap_or_else(|| LakeFSAuth::Basic {
880 access_key: config.access_key.clone(),
881 secret_key: config.secret_key.clone(),
882 });
883 let auth_provider = build_auth_provider(&auth, &endpoint)?;
884
885 let mut headers = HeaderMap::new();
887 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
888
889 let client = Client::builder()
890 .default_headers(headers)
891 .timeout(std::time::Duration::from_secs(30))
892 .build()
893 .map_err(|e| {
894 StorageError::ConnectionError(format!("Failed to create HTTP client: {}", e))
895 })?;
896
897 Ok(Self {
898 client,
899 endpoint,
900 repository: config.repository,
901 branch: config.branch,
902 access_key: config.access_key,
903 secret_key: config.secret_key,
904 auth_provider,
905 pending_writes: Arc::new(Mutex::new(Vec::new())),
906 })
907 }
908
909 async fn send_authed(&self, builder: RequestBuilder) -> Result<Response, StorageError> {
914 let auth_header = self.auth_provider.auth_header().await?;
915 builder
916 .header(AUTHORIZATION, auth_header)
917 .send()
918 .await
919 .map_err(|e| StorageError::ConnectionError(format!("Request failed: {}", e)))
920 }
921
922 pub fn auth_mode(&self) -> &'static str {
924 self.auth_provider.mode_name()
925 }
926
927 pub fn repository(&self) -> &str {
931 &self.repository
932 }
933
934 pub fn branch(&self) -> &str {
936 &self.branch
937 }
938
939 pub fn endpoint(&self) -> &str {
941 &self.endpoint
942 }
943
944 pub async fn create_commit(&self, message: &str) -> Result<String, StorageError> {
948 let url = format!(
949 "{}/api/v1/repositories/{}/branches/{}/commits",
950 self.endpoint, self.repository, self.branch
951 );
952
953 #[derive(Serialize)]
954 struct CommitRequest {
955 message: String,
956 metadata: HashMap<String, String>,
957 }
958
959 let mut metadata = HashMap::new();
960 metadata.insert("source".to_string(), "briefcase-ai".to_string());
961
962 let request = CommitRequest {
963 message: message.to_string(),
964 metadata,
965 };
966
967 let response = self
968 .send_authed(self.client.post(&url).json(&request))
969 .await?;
970
971 let status = response.status();
972 if !status.is_success() {
973 let error_text = response.text().await.unwrap_or_default();
974 return Err(StorageError::ConnectionError(format!(
975 "Commit failed with status {}: {}",
976 status, error_text
977 )));
978 }
979
980 #[derive(Deserialize)]
981 struct CommitResponse {
982 id: String,
983 }
984
985 let commit_response: CommitResponse = response.json().await.map_err(|e| {
986 StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
987 })?;
988
989 Ok(commit_response.id)
990 }
991
992 pub async fn upload_object(&self, path: &str, data: &[u8]) -> Result<(), StorageError> {
996 let url = format!(
997 "{}/api/v1/repositories/{}/branches/{}/objects",
998 self.endpoint, self.repository, self.branch
999 );
1000
1001 let response = self
1002 .send_authed(
1003 self.client
1004 .put(&url)
1005 .query(&[("path", path)])
1006 .header("Content-Type", "application/octet-stream")
1007 .body(data.to_vec()),
1008 )
1009 .await?;
1010
1011 let status = response.status();
1012 if !status.is_success() {
1013 let error_text = response.text().await.unwrap_or_default();
1014 return Err(StorageError::ConnectionError(format!(
1015 "Upload failed with status {}: {}",
1016 status, error_text
1017 )));
1018 }
1019
1020 Ok(())
1021 }
1022
1023 pub async fn download_object(&self, path: &str) -> Result<Vec<u8>, StorageError> {
1025 let url = format!(
1026 "{}/api/v1/repositories/{}/refs/{}/objects",
1027 self.endpoint, self.repository, self.branch
1028 );
1029
1030 let response = self
1031 .send_authed(self.client.get(&url).query(&[("path", path)]))
1032 .await?;
1033
1034 if response.status() == reqwest::StatusCode::NOT_FOUND {
1035 return Err(StorageError::NotFound(format!(
1036 "Object not found: {}",
1037 path
1038 )));
1039 }
1040
1041 let status = response.status();
1042 if !status.is_success() {
1043 let error_text = response.text().await.unwrap_or_default();
1044 return Err(StorageError::ConnectionError(format!(
1045 "Download failed with status {}: {}",
1046 status, error_text
1047 )));
1048 }
1049
1050 let data = response.bytes().await.map_err(|e| {
1051 StorageError::ConnectionError(format!("Failed to read response: {}", e))
1052 })?;
1053
1054 Ok(data.to_vec())
1055 }
1056
1057 pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
1059 let url = format!(
1060 "{}/api/v1/repositories/{}/refs/{}/objects/ls",
1061 self.endpoint, self.repository, self.branch
1062 );
1063
1064 let response = self
1065 .send_authed(self.client.get(&url).query(&[("prefix", prefix)]))
1066 .await?;
1067
1068 let status = response.status();
1069 if !status.is_success() {
1070 let error_text = response.text().await.unwrap_or_default();
1071 return Err(StorageError::ConnectionError(format!(
1072 "List failed with status {}: {}",
1073 status, error_text
1074 )));
1075 }
1076
1077 #[derive(Deserialize)]
1078 struct ListResponse {
1079 results: Vec<ObjectInfo>,
1080 }
1081
1082 #[derive(Deserialize)]
1083 struct ObjectInfo {
1084 path: String,
1085 #[serde(rename = "type")]
1086 object_type: String,
1087 }
1088
1089 let list_response: ListResponse = response.json().await.map_err(|e| {
1090 StorageError::SerializationError(format!("Failed to parse list response: {}", e))
1091 })?;
1092
1093 let paths = list_response
1094 .results
1095 .into_iter()
1096 .filter(|obj| obj.object_type == "object")
1097 .map(|obj| obj.path)
1098 .collect();
1099
1100 Ok(paths)
1101 }
1102
1103 pub async fn delete_object(&self, path: &str) -> Result<(), StorageError> {
1105 if path.is_empty() {
1106 return Err(StorageError::InvalidQuery(
1107 "Object path cannot be empty".to_string(),
1108 ));
1109 }
1110
1111 let url = format!(
1112 "{}/api/v1/repositories/{}/branches/{}/objects",
1113 self.endpoint, self.repository, self.branch
1114 );
1115
1116 let response = self
1117 .send_authed(self.client.delete(&url).query(&[("path", path)]))
1118 .await?;
1119
1120 if response.status() == reqwest::StatusCode::NOT_FOUND {
1121 return Err(StorageError::NotFound(format!(
1122 "Object not found: {}",
1123 path
1124 )));
1125 }
1126
1127 let status = response.status();
1128 if !status.is_success() {
1129 let error_text = response.text().await.unwrap_or_default();
1130 return Err(StorageError::ConnectionError(format!(
1131 "Delete failed with status {}: {}",
1132 status, error_text
1133 )));
1134 }
1135
1136 Ok(())
1137 }
1138
1139 pub async fn get_object_stats(&self, path: &str) -> Result<ObjectStats, StorageError> {
1141 if path.is_empty() {
1142 return Err(StorageError::InvalidQuery(
1143 "Object path cannot be empty".to_string(),
1144 ));
1145 }
1146
1147 let url = format!(
1148 "{}/api/v1/repositories/{}/refs/{}/objects/stat",
1149 self.endpoint, self.repository, self.branch
1150 );
1151
1152 let response = self
1153 .send_authed(self.client.get(&url).query(&[("path", path)]))
1154 .await?;
1155
1156 if response.status() == reqwest::StatusCode::NOT_FOUND {
1157 return Err(StorageError::NotFound(format!(
1158 "Object not found: {}",
1159 path
1160 )));
1161 }
1162
1163 let status = response.status();
1164 if !status.is_success() {
1165 let error_text = response.text().await.unwrap_or_default();
1166 return Err(StorageError::ConnectionError(format!(
1167 "Stats failed with status {}: {}",
1168 status, error_text
1169 )));
1170 }
1171
1172 let stats: ObjectStats = response.json().await.map_err(|e| {
1173 StorageError::SerializationError(format!("Failed to parse stats response: {}", e))
1174 })?;
1175
1176 Ok(stats)
1177 }
1178
1179 pub async fn create_repository(
1183 &self,
1184 name: &str,
1185 storage_namespace: &str,
1186 default_branch: Option<&str>,
1187 ) -> Result<RepositoryInfo, StorageError> {
1188 if name.is_empty() {
1189 return Err(StorageError::InvalidQuery(
1190 "Repository name cannot be empty".to_string(),
1191 ));
1192 }
1193 if storage_namespace.is_empty() {
1194 return Err(StorageError::InvalidQuery(
1195 "Storage namespace cannot be empty".to_string(),
1196 ));
1197 }
1198
1199 let url = format!("{}/api/v1/repositories", self.endpoint);
1200
1201 #[derive(Serialize)]
1202 struct CreateRepoRequest {
1203 name: String,
1204 storage_namespace: String,
1205 default_branch: String,
1206 }
1207
1208 let request = CreateRepoRequest {
1209 name: name.to_string(),
1210 storage_namespace: storage_namespace.to_string(),
1211 default_branch: default_branch.unwrap_or("main").to_string(),
1212 };
1213
1214 let response = self
1215 .send_authed(self.client.post(&url).json(&request))
1216 .await?;
1217
1218 if response.status() == reqwest::StatusCode::CONFLICT {
1219 return Err(StorageError::ConnectionError(format!(
1220 "Repository '{}' already exists",
1221 name
1222 )));
1223 }
1224
1225 let status = response.status();
1226 if !status.is_success() {
1227 let error_text = response.text().await.unwrap_or_default();
1228 return Err(StorageError::ConnectionError(format!(
1229 "Create repository failed with status {}: {}",
1230 status, error_text
1231 )));
1232 }
1233
1234 let info: RepositoryInfo = response.json().await.map_err(|e| {
1235 StorageError::SerializationError(format!("Failed to parse repository response: {}", e))
1236 })?;
1237
1238 Ok(info)
1239 }
1240
1241 pub async fn delete_repository(&self, name: &str) -> Result<(), StorageError> {
1243 if name.is_empty() {
1244 return Err(StorageError::InvalidQuery(
1245 "Repository name cannot be empty".to_string(),
1246 ));
1247 }
1248
1249 let url = format!("{}/api/v1/repositories/{}", self.endpoint, name);
1250
1251 let response = self.send_authed(self.client.delete(&url)).await?;
1252
1253 if response.status() == reqwest::StatusCode::NOT_FOUND {
1254 return Err(StorageError::NotFound(format!(
1255 "Repository not found: {}",
1256 name
1257 )));
1258 }
1259
1260 let status = response.status();
1261 if !status.is_success() {
1262 let error_text = response.text().await.unwrap_or_default();
1263 return Err(StorageError::ConnectionError(format!(
1264 "Delete repository failed with status {}: {}",
1265 status, error_text
1266 )));
1267 }
1268
1269 Ok(())
1270 }
1271
1272 pub async fn create_branch(
1276 &self,
1277 name: &str,
1278 source_branch: &str,
1279 ) -> Result<String, StorageError> {
1280 if name.is_empty() {
1281 return Err(StorageError::InvalidQuery(
1282 "Branch name cannot be empty".to_string(),
1283 ));
1284 }
1285 if source_branch.is_empty() {
1286 return Err(StorageError::InvalidQuery(
1287 "Source branch cannot be empty".to_string(),
1288 ));
1289 }
1290
1291 let url = format!(
1292 "{}/api/v1/repositories/{}/branches",
1293 self.endpoint, self.repository
1294 );
1295
1296 #[derive(Serialize)]
1297 struct CreateBranchRequest {
1298 name: String,
1299 source: String,
1300 }
1301
1302 let request = CreateBranchRequest {
1303 name: name.to_string(),
1304 source: source_branch.to_string(),
1305 };
1306
1307 let response = self
1308 .send_authed(self.client.post(&url).json(&request))
1309 .await?;
1310
1311 if response.status() == reqwest::StatusCode::CONFLICT {
1312 return Err(StorageError::ConnectionError(format!(
1313 "Branch '{}' already exists",
1314 name
1315 )));
1316 }
1317
1318 let status = response.status();
1319 if !status.is_success() {
1320 let error_text = response.text().await.unwrap_or_default();
1321 return Err(StorageError::ConnectionError(format!(
1322 "Create branch failed with status {}: {}",
1323 status, error_text
1324 )));
1325 }
1326
1327 let commit_id = response.text().await.map_err(|e| {
1329 StorageError::SerializationError(format!("Failed to read branch response: {}", e))
1330 })?;
1331
1332 let commit_id = commit_id.trim().trim_matches('"').to_string();
1334 Ok(commit_id)
1335 }
1336
1337 pub async fn delete_branch(&self, name: &str) -> Result<(), StorageError> {
1339 if name.is_empty() {
1340 return Err(StorageError::InvalidQuery(
1341 "Branch name cannot be empty".to_string(),
1342 ));
1343 }
1344
1345 let url = format!(
1346 "{}/api/v1/repositories/{}/branches/{}",
1347 self.endpoint, self.repository, name
1348 );
1349
1350 let response = self.send_authed(self.client.delete(&url)).await?;
1351
1352 if response.status() == reqwest::StatusCode::NOT_FOUND {
1353 return Err(StorageError::NotFound(format!(
1354 "Branch not found: {}",
1355 name
1356 )));
1357 }
1358
1359 let status = response.status();
1360 if !status.is_success() {
1361 let error_text = response.text().await.unwrap_or_default();
1362 return Err(StorageError::ConnectionError(format!(
1363 "Delete branch failed with status {}: {}",
1364 status, error_text
1365 )));
1366 }
1367
1368 Ok(())
1369 }
1370
1371 pub async fn merge_branch(
1373 &self,
1374 source_branch: &str,
1375 destination_branch: &str,
1376 message: Option<&str>,
1377 ) -> Result<MergeResult, StorageError> {
1378 if source_branch.is_empty() || destination_branch.is_empty() {
1379 return Err(StorageError::InvalidQuery(
1380 "Source and destination branches cannot be empty".to_string(),
1381 ));
1382 }
1383
1384 let url = format!(
1385 "{}/api/v1/repositories/{}/refs/{}/merge/{}",
1386 self.endpoint, self.repository, source_branch, destination_branch
1387 );
1388
1389 #[derive(Serialize)]
1390 struct MergeRequest {
1391 message: String,
1392 metadata: HashMap<String, String>,
1393 }
1394
1395 let mut metadata = HashMap::new();
1396 metadata.insert("source".to_string(), "briefcase-ai".to_string());
1397
1398 let request = MergeRequest {
1399 message: message
1400 .unwrap_or(&format!(
1401 "Merge {} into {}",
1402 source_branch, destination_branch
1403 ))
1404 .to_string(),
1405 metadata,
1406 };
1407
1408 let response = self
1409 .send_authed(self.client.post(&url).json(&request))
1410 .await?;
1411
1412 if response.status() == reqwest::StatusCode::CONFLICT {
1413 return Err(StorageError::ConnectionError(
1414 "Merge conflict detected".to_string(),
1415 ));
1416 }
1417
1418 let status = response.status();
1419 if !status.is_success() {
1420 let error_text = response.text().await.unwrap_or_default();
1421 return Err(StorageError::ConnectionError(format!(
1422 "Merge failed with status {}: {}",
1423 status, error_text
1424 )));
1425 }
1426
1427 #[derive(Deserialize)]
1429 struct MergeResponse {
1430 #[serde(default)]
1431 reference: String,
1432 #[serde(default)]
1433 summary: HashMap<String, serde_json::Value>,
1434 }
1435
1436 let merge_resp: MergeResponse = response.json().await.map_err(|e| {
1437 StorageError::SerializationError(format!("Failed to parse merge response: {}", e))
1438 })?;
1439
1440 Ok(MergeResult {
1441 commit_id: merge_resp.reference,
1442 summary: serde_json::to_string(&merge_resp.summary).unwrap_or_default(),
1443 })
1444 }
1445
1446 pub async fn get_commit(&self, commit_id: &str) -> Result<CommitInfo, StorageError> {
1450 if commit_id.is_empty() {
1451 return Err(StorageError::InvalidQuery(
1452 "Commit ID cannot be empty".to_string(),
1453 ));
1454 }
1455
1456 let url = format!(
1457 "{}/api/v1/repositories/{}/commits/{}",
1458 self.endpoint, self.repository, commit_id
1459 );
1460
1461 let response = self.send_authed(self.client.get(&url)).await?;
1462
1463 if response.status() == reqwest::StatusCode::NOT_FOUND {
1464 return Err(StorageError::NotFound(format!(
1465 "Commit not found: {}",
1466 commit_id
1467 )));
1468 }
1469
1470 let status = response.status();
1471 if !status.is_success() {
1472 let error_text = response.text().await.unwrap_or_default();
1473 return Err(StorageError::ConnectionError(format!(
1474 "Get commit failed with status {}: {}",
1475 status, error_text
1476 )));
1477 }
1478
1479 let info: CommitInfo = response.json().await.map_err(|e| {
1480 StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
1481 })?;
1482
1483 Ok(info)
1484 }
1485
1486 pub async fn diff_refs(
1490 &self,
1491 left_ref: &str,
1492 right_ref: &str,
1493 ) -> Result<Vec<DiffEntry>, StorageError> {
1494 if left_ref.is_empty() || right_ref.is_empty() {
1495 return Err(StorageError::InvalidQuery(
1496 "Both refs must be non-empty".to_string(),
1497 ));
1498 }
1499
1500 let url = format!(
1501 "{}/api/v1/repositories/{}/refs/{}/diff/{}",
1502 self.endpoint, self.repository, left_ref, right_ref
1503 );
1504
1505 let response = self.send_authed(self.client.get(&url)).await?;
1506
1507 let status = response.status();
1508 if !status.is_success() {
1509 let error_text = response.text().await.unwrap_or_default();
1510 return Err(StorageError::ConnectionError(format!(
1511 "Diff failed with status {}: {}",
1512 status, error_text
1513 )));
1514 }
1515
1516 #[derive(Deserialize)]
1517 struct DiffResponse {
1518 results: Vec<DiffEntry>,
1519 }
1520
1521 let diff_resp: DiffResponse = response.json().await.map_err(|e| {
1522 StorageError::SerializationError(format!("Failed to parse diff response: {}", e))
1523 })?;
1524
1525 Ok(diff_resp.results)
1526 }
1527
1528 pub async fn register_action(&self, action: &LakeFSAction) -> Result<String, StorageError> {
1533 if action.name.is_empty() {
1534 return Err(StorageError::InvalidQuery(
1535 "Action name cannot be empty".to_string(),
1536 ));
1537 }
1538 if action.on.is_empty() {
1539 return Err(StorageError::InvalidQuery(
1540 "Action must have at least one event trigger".to_string(),
1541 ));
1542 }
1543 if action.hooks.is_empty() {
1544 return Err(StorageError::InvalidQuery(
1545 "Action must have at least one hook".to_string(),
1546 ));
1547 }
1548
1549 let yaml_content = self.build_action_yaml(action);
1551
1552 let path = format!("_lakefs_actions/{}.yaml", action.name);
1554 self.upload_object(&path, yaml_content.as_bytes()).await?;
1555
1556 let commit_msg = format!("Register LakeFS Action: {}", action.name);
1558 let commit_id = self.create_commit(&commit_msg).await?;
1559
1560 Ok(commit_id)
1561 }
1562
1563 fn build_action_yaml(&self, action: &LakeFSAction) -> String {
1565 let mut yaml = String::new();
1566 yaml.push_str(&format!("name: {}\n", action.name));
1567 yaml.push_str("on:\n");
1568 for event in &action.on {
1569 yaml.push_str(&format!(" {}:\n", event));
1570 }
1571 yaml.push_str("hooks:\n");
1572 for hook in &action.hooks {
1573 yaml.push_str(&format!(" - id: {}\n", hook.id));
1574 yaml.push_str(&format!(" type: {}\n", hook.hook_type));
1575 yaml.push_str(&format!(" description: {}\n", hook.description));
1576 if !hook.properties.is_empty() {
1577 yaml.push_str(" properties:\n");
1578 for (k, v) in &hook.properties {
1579 yaml.push_str(&format!(" {}: {}\n", k, v));
1580 }
1581 }
1582 }
1583 yaml
1584 }
1585
1586 fn snapshot_path(&self, snapshot_id: &str) -> String {
1590 format!("snapshots/{}.json", snapshot_id)
1591 }
1592
1593 fn decision_path(&self, decision_id: &str) -> String {
1595 format!("decisions/{}.json", decision_id)
1596 }
1597}
1598
1599#[cfg(all(feature = "async", feature = "networking"))]
1602#[async_trait::async_trait]
1603impl StorageBackend for LakeFSBackend {
1604 async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
1605 let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1606
1607 {
1609 let mut pending = self.pending_writes.lock().unwrap();
1610 pending.push(snapshot.clone());
1611 }
1612
1613 Ok(snapshot_id)
1614 }
1615
1616 async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
1617 let decision_id = decision.metadata.snapshot_id.to_string();
1618 let path = self.decision_path(&decision_id);
1619
1620 let json_data = serde_json::to_vec(decision)
1621 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1622
1623 self.upload_object(&path, &json_data).await?;
1624
1625 Ok(decision_id)
1626 }
1627
1628 async fn load(&self, snapshot_id: &str) -> Result<Snapshot, StorageError> {
1629 let path = self.snapshot_path(snapshot_id);
1630 let data = self.download_object(&path).await?;
1631
1632 let snapshot: Snapshot = serde_json::from_slice(&data)
1633 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1634
1635 Ok(snapshot)
1636 }
1637
1638 async fn load_decision(&self, decision_id: &str) -> Result<DecisionSnapshot, StorageError> {
1639 let path = self.decision_path(decision_id);
1640 let data = self.download_object(&path).await?;
1641
1642 let decision: DecisionSnapshot = serde_json::from_slice(&data)
1643 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1644
1645 Ok(decision)
1646 }
1647
1648 async fn query(&self, query: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
1649 let paths = self.list_objects("snapshots/").await?;
1651
1652 let mut snapshots = Vec::new();
1653 let mut count = 0;
1654 let offset = query.offset.unwrap_or(0);
1655 let limit = query.limit.unwrap_or(usize::MAX);
1656
1657 for path in paths {
1658 if let Some(filename) = path.split('/').next_back() {
1659 if let Some(snapshot_id) = filename.strip_suffix(".json") {
1660 match self.load(snapshot_id).await {
1662 Ok(snapshot) => {
1663 if self.matches_query(&snapshot, &query) {
1665 if count >= offset {
1666 snapshots.push(snapshot);
1667 if snapshots.len() >= limit {
1668 break;
1669 }
1670 }
1671 count += 1;
1672 }
1673 }
1674 Err(_) => continue, }
1676 }
1677 }
1678 }
1679
1680 snapshots.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
1682
1683 Ok(snapshots)
1684 }
1685
1686 async fn delete(&self, snapshot_id: &str) -> Result<bool, StorageError> {
1687 let path = self.snapshot_path(snapshot_id);
1688 match self.delete_object(&path).await {
1689 Ok(()) => Ok(true),
1690 Err(StorageError::NotFound(_)) => Ok(false),
1691 Err(e) => Err(e),
1692 }
1693 }
1694
1695 async fn flush(&self) -> Result<FlushResult, StorageError> {
1696 let pending_snapshots = {
1697 let mut pending = self.pending_writes.lock().unwrap();
1698 let snapshots = pending.clone();
1699 pending.clear();
1700 snapshots
1701 };
1702
1703 if pending_snapshots.is_empty() {
1704 return Ok(FlushResult {
1705 snapshots_written: 0,
1706 bytes_written: 0,
1707 checkpoint_id: None,
1708 });
1709 }
1710
1711 let mut bytes_written = 0;
1712
1713 for snapshot in &pending_snapshots {
1715 let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1716 let path = self.snapshot_path(&snapshot_id);
1717
1718 let json_data = serde_json::to_vec(snapshot)
1719 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1720
1721 bytes_written += json_data.len();
1722
1723 self.upload_object(&path, &json_data).await?;
1724
1725 for decision in &snapshot.decisions {
1727 let decision_id = decision.metadata.snapshot_id.to_string();
1728 let decision_path = self.decision_path(&decision_id);
1729
1730 let decision_data = serde_json::to_vec(decision)
1731 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1732
1733 bytes_written += decision_data.len();
1734 self.upload_object(&decision_path, &decision_data).await?;
1735 }
1736 }
1737
1738 let commit_message = format!("Briefcase AI flush: {} snapshots", pending_snapshots.len());
1740 let commit_id = self.create_commit(&commit_message).await?;
1741
1742 Ok(FlushResult {
1743 snapshots_written: pending_snapshots.len(),
1744 bytes_written,
1745 checkpoint_id: Some(commit_id),
1746 })
1747 }
1748
1749 async fn health_check(&self) -> Result<bool, StorageError> {
1750 let url = format!("{}/api/v1/repositories/{}", self.endpoint, self.repository);
1751
1752 let response = self
1753 .send_authed(self.client.get(&url))
1754 .await
1755 .map_err(|e| StorageError::ConnectionError(format!("Health check failed: {}", e)))?;
1756
1757 Ok(response.status().is_success())
1758 }
1759}
1760
1761#[cfg(all(feature = "async", feature = "networking"))]
1762impl LakeFSBackend {
1763 fn matches_query(&self, snapshot: &Snapshot, query: &SnapshotQuery) -> bool {
1765 if let Some(start_time) = query.start_time {
1767 if snapshot.metadata.timestamp < start_time {
1768 return false;
1769 }
1770 }
1771
1772 if let Some(end_time) = query.end_time {
1773 if snapshot.metadata.timestamp > end_time {
1774 return false;
1775 }
1776 }
1777
1778 if query.function_name.is_some()
1780 || query.module_name.is_some()
1781 || query.model_name.is_some()
1782 || query.tags.is_some()
1783 {
1784 let mut found_match = false;
1785
1786 for decision in &snapshot.decisions {
1787 let mut decision_matches = true;
1788
1789 if let Some(function_name) = &query.function_name {
1790 if decision.function_name != *function_name {
1791 decision_matches = false;
1792 }
1793 }
1794
1795 if let Some(module_name) = &query.module_name {
1796 if decision.module_name.as_ref() != Some(module_name) {
1797 decision_matches = false;
1798 }
1799 }
1800
1801 if let Some(model_name) = &query.model_name {
1802 if let Some(model_params) = &decision.model_parameters {
1803 if model_params.model_name != *model_name {
1804 decision_matches = false;
1805 }
1806 } else {
1807 decision_matches = false;
1808 }
1809 }
1810
1811 if let Some(query_tags) = &query.tags {
1812 for (key, value) in query_tags {
1813 if decision.tags.get(key) != Some(value) {
1814 decision_matches = false;
1815 break;
1816 }
1817 }
1818 }
1819
1820 if decision_matches {
1821 found_match = true;
1822 break;
1823 }
1824 }
1825
1826 if !found_match {
1827 return false;
1828 }
1829 }
1830
1831 true
1832 }
1833}
1834
1835#[derive(Debug, Clone)]
1838pub struct LakeFSConfig {
1839 pub endpoint: String,
1840 pub repository: String,
1841 pub branch: String,
1842 pub access_key: String,
1843 pub secret_key: String,
1844 pub auth: Option<LakeFSAuth>,
1847}
1848
1849impl LakeFSConfig {
1850 pub fn new(
1851 endpoint: impl Into<String>,
1852 repository: impl Into<String>,
1853 branch: impl Into<String>,
1854 access_key: impl Into<String>,
1855 secret_key: impl Into<String>,
1856 ) -> Self {
1857 Self {
1858 endpoint: endpoint.into(),
1859 repository: repository.into(),
1860 branch: branch.into(),
1861 access_key: access_key.into(),
1862 secret_key: secret_key.into(),
1863 auth: None,
1864 }
1865 }
1866
1867 pub fn with_auth(mut self, auth: LakeFSAuth) -> Self {
1869 self.auth = Some(auth);
1870 self
1871 }
1872}
1873
1874#[cfg(test)]
1877mod tests {
1878 use super::*;
1879 use crate::models::*;
1880 use serde_json::json;
1881 use wiremock::matchers::{method, path, query_param};
1882 use wiremock::{Mock, MockServer, ResponseTemplate};
1883
1884 fn create_test_config_with_endpoint(endpoint: &str) -> LakeFSConfig {
1885 LakeFSConfig::new(
1886 endpoint,
1887 "briefcase-test",
1888 "main",
1889 "test_key",
1890 "test_secret",
1891 )
1892 }
1893
1894 fn create_test_config() -> LakeFSConfig {
1895 LakeFSConfig::new(
1896 "http://localhost:8000",
1897 "briefcase-test",
1898 "main",
1899 "test_access_key",
1900 "test_secret_key",
1901 )
1902 }
1903
1904 async fn create_test_snapshot() -> Snapshot {
1905 let input = Input::new("test_input", json!("value"), "string");
1906 let output = Output::new("test_output", json!("result"), "string");
1907 let model_params = ModelParameters::new("gpt-4");
1908
1909 let decision = DecisionSnapshot::new("test_function")
1910 .with_module("test_module")
1911 .add_input(input)
1912 .add_output(output)
1913 .with_model_parameters(model_params)
1914 .add_tag("env", "test");
1915
1916 let mut snapshot = Snapshot::new(SnapshotType::Session);
1917 snapshot.add_decision(decision);
1918 snapshot
1919 }
1920
1921 #[tokio::test]
1924 async fn test_lakefs_config_creation() {
1925 let config = create_test_config();
1926 assert_eq!(config.endpoint, "http://localhost:8000");
1927 assert_eq!(config.repository, "briefcase-test");
1928 assert_eq!(config.branch, "main");
1929 }
1930
1931 #[tokio::test]
1932 async fn test_object_paths() {
1933 let config = create_test_config();
1934 let backend = LakeFSBackend::new(config).unwrap();
1935
1936 let snapshot_id = "test-snapshot-123";
1937 let decision_id = "test-decision-456";
1938
1939 assert_eq!(
1940 backend.snapshot_path(snapshot_id),
1941 "snapshots/test-snapshot-123.json"
1942 );
1943 assert_eq!(
1944 backend.decision_path(decision_id),
1945 "decisions/test-decision-456.json"
1946 );
1947 }
1948
1949 #[tokio::test]
1950 async fn test_query_matching() {
1951 let config = create_test_config();
1952 let backend = LakeFSBackend::new(config).unwrap();
1953 let snapshot = create_test_snapshot().await;
1954
1955 let query = SnapshotQuery::new().with_function_name("test_function");
1957 assert!(backend.matches_query(&snapshot, &query));
1958
1959 let query = SnapshotQuery::new().with_function_name("other_function");
1960 assert!(!backend.matches_query(&snapshot, &query));
1961
1962 let query = SnapshotQuery::new().with_tag("env", "test");
1964 assert!(backend.matches_query(&snapshot, &query));
1965
1966 let query = SnapshotQuery::new().with_tag("env", "prod");
1967 assert!(!backend.matches_query(&snapshot, &query));
1968
1969 let query = SnapshotQuery::new().with_model_name("gpt-4");
1971 assert!(backend.matches_query(&snapshot, &query));
1972
1973 let query = SnapshotQuery::new().with_model_name("claude-3");
1974 assert!(!backend.matches_query(&snapshot, &query));
1975 }
1976
1977 #[tokio::test]
1978 async fn test_pending_writes() {
1979 let config = create_test_config();
1980 let backend = LakeFSBackend::new(config).unwrap();
1981 let snapshot = create_test_snapshot().await;
1982
1983 let snapshot_id = backend.save(&snapshot).await.unwrap();
1985 assert_eq!(snapshot_id, snapshot.metadata.snapshot_id.to_string());
1986
1987 {
1989 let pending = backend.pending_writes.lock().unwrap();
1990 assert_eq!(pending.len(), 1);
1991 }
1992 }
1993
1994 #[tokio::test]
1995 async fn test_accessors() {
1996 let config = create_test_config();
1997 let backend = LakeFSBackend::new(config).unwrap();
1998
1999 assert_eq!(backend.repository(), "briefcase-test");
2000 assert_eq!(backend.branch(), "main");
2001 assert_eq!(backend.endpoint(), "http://localhost:8000");
2002 }
2003
2004 #[tokio::test]
2007 async fn test_create_repository_empty_name() {
2008 let config = create_test_config();
2009 let backend = LakeFSBackend::new(config).unwrap();
2010
2011 let result = backend.create_repository("", "s3://bucket", None).await;
2012 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2013 }
2014
2015 #[tokio::test]
2016 async fn test_create_repository_empty_namespace() {
2017 let config = create_test_config();
2018 let backend = LakeFSBackend::new(config).unwrap();
2019
2020 let result = backend.create_repository("my-repo", "", None).await;
2021 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2022 }
2023
2024 #[tokio::test]
2025 async fn test_delete_repository_empty_name() {
2026 let config = create_test_config();
2027 let backend = LakeFSBackend::new(config).unwrap();
2028
2029 let result = backend.delete_repository("").await;
2030 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2031 }
2032
2033 #[tokio::test]
2034 async fn test_create_branch_empty_name() {
2035 let config = create_test_config();
2036 let backend = LakeFSBackend::new(config).unwrap();
2037
2038 let result = backend.create_branch("", "main").await;
2039 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2040 }
2041
2042 #[tokio::test]
2043 async fn test_create_branch_empty_source() {
2044 let config = create_test_config();
2045 let backend = LakeFSBackend::new(config).unwrap();
2046
2047 let result = backend.create_branch("feature", "").await;
2048 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2049 }
2050
2051 #[tokio::test]
2052 async fn test_delete_branch_empty_name() {
2053 let config = create_test_config();
2054 let backend = LakeFSBackend::new(config).unwrap();
2055
2056 let result = backend.delete_branch("").await;
2057 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2058 }
2059
2060 #[tokio::test]
2061 async fn test_merge_branch_empty_source() {
2062 let config = create_test_config();
2063 let backend = LakeFSBackend::new(config).unwrap();
2064
2065 let result = backend.merge_branch("", "main", None).await;
2066 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2067 }
2068
2069 #[tokio::test]
2070 async fn test_merge_branch_empty_destination() {
2071 let config = create_test_config();
2072 let backend = LakeFSBackend::new(config).unwrap();
2073
2074 let result = backend.merge_branch("feature", "", None).await;
2075 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2076 }
2077
2078 #[tokio::test]
2079 async fn test_get_commit_empty_id() {
2080 let config = create_test_config();
2081 let backend = LakeFSBackend::new(config).unwrap();
2082
2083 let result = backend.get_commit("").await;
2084 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2085 }
2086
2087 #[tokio::test]
2088 async fn test_diff_refs_empty_left() {
2089 let config = create_test_config();
2090 let backend = LakeFSBackend::new(config).unwrap();
2091
2092 let result = backend.diff_refs("", "main").await;
2093 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2094 }
2095
2096 #[tokio::test]
2097 async fn test_diff_refs_empty_right() {
2098 let config = create_test_config();
2099 let backend = LakeFSBackend::new(config).unwrap();
2100
2101 let result = backend.diff_refs("feature", "").await;
2102 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2103 }
2104
2105 #[tokio::test]
2106 async fn test_delete_object_empty_path() {
2107 let config = create_test_config();
2108 let backend = LakeFSBackend::new(config).unwrap();
2109
2110 let result = backend.delete_object("").await;
2111 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2112 }
2113
2114 #[tokio::test]
2115 async fn test_get_object_stats_empty_path() {
2116 let config = create_test_config();
2117 let backend = LakeFSBackend::new(config).unwrap();
2118
2119 let result = backend.get_object_stats("").await;
2120 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2121 }
2122
2123 #[tokio::test]
2124 async fn test_register_action_empty_name() {
2125 let config = create_test_config();
2126 let backend = LakeFSBackend::new(config).unwrap();
2127
2128 let action = LakeFSAction {
2129 name: "".to_string(),
2130 on: vec!["post-commit".to_string()],
2131 hooks: vec![ActionHook {
2132 id: "hook1".to_string(),
2133 hook_type: "webhook".to_string(),
2134 description: "test".to_string(),
2135 properties: HashMap::new(),
2136 }],
2137 };
2138
2139 let result = backend.register_action(&action).await;
2140 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2141 }
2142
2143 #[tokio::test]
2144 async fn test_register_action_no_events() {
2145 let config = create_test_config();
2146 let backend = LakeFSBackend::new(config).unwrap();
2147
2148 let action = LakeFSAction {
2149 name: "test-action".to_string(),
2150 on: vec![],
2151 hooks: vec![ActionHook {
2152 id: "hook1".to_string(),
2153 hook_type: "webhook".to_string(),
2154 description: "test".to_string(),
2155 properties: HashMap::new(),
2156 }],
2157 };
2158
2159 let result = backend.register_action(&action).await;
2160 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2161 }
2162
2163 #[tokio::test]
2164 async fn test_register_action_no_hooks() {
2165 let config = create_test_config();
2166 let backend = LakeFSBackend::new(config).unwrap();
2167
2168 let action = LakeFSAction {
2169 name: "test-action".to_string(),
2170 on: vec!["post-commit".to_string()],
2171 hooks: vec![],
2172 };
2173
2174 let result = backend.register_action(&action).await;
2175 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2176 }
2177
2178 #[tokio::test]
2181 async fn test_build_action_yaml() {
2182 let config = create_test_config();
2183 let backend = LakeFSBackend::new(config).unwrap();
2184
2185 let mut props = HashMap::new();
2186 props.insert("url".to_string(), "http://localhost:9000/hook".to_string());
2187
2188 let action = LakeFSAction {
2189 name: "briefcase-post-commit".to_string(),
2190 on: vec!["post-commit".to_string()],
2191 hooks: vec![ActionHook {
2192 id: "notify".to_string(),
2193 hook_type: "webhook".to_string(),
2194 description: "Notify Briefcase server".to_string(),
2195 properties: props,
2196 }],
2197 };
2198
2199 let yaml = backend.build_action_yaml(&action);
2200 assert!(yaml.contains("name: briefcase-post-commit"));
2201 assert!(yaml.contains("post-commit:"));
2202 assert!(yaml.contains("id: notify"));
2203 assert!(yaml.contains("type: webhook"));
2204 assert!(yaml.contains("url: http://localhost:9000/hook"));
2205 }
2206
2207 #[tokio::test]
2208 async fn test_build_action_yaml_multiple_events() {
2209 let config = create_test_config();
2210 let backend = LakeFSBackend::new(config).unwrap();
2211
2212 let action = LakeFSAction {
2213 name: "multi-event".to_string(),
2214 on: vec!["pre-commit".to_string(), "post-merge".to_string()],
2215 hooks: vec![ActionHook {
2216 id: "h1".to_string(),
2217 hook_type: "webhook".to_string(),
2218 description: "Hook".to_string(),
2219 properties: HashMap::new(),
2220 }],
2221 };
2222
2223 let yaml = backend.build_action_yaml(&action);
2224 assert!(yaml.contains("pre-commit:"));
2225 assert!(yaml.contains("post-merge:"));
2226 }
2227
2228 #[tokio::test]
2231 async fn test_flush_empty_pending() {
2232 let config = create_test_config();
2233 let backend = LakeFSBackend::new(config).unwrap();
2234
2235 let result = backend.flush().await.unwrap();
2236 assert_eq!(result.snapshots_written, 0);
2237 assert_eq!(result.bytes_written, 0);
2238 assert!(result.checkpoint_id.is_none());
2239 }
2240
2241 #[tokio::test]
2244 async fn test_create_repository_success() {
2245 let mock_server = MockServer::start().await;
2246 let config = create_test_config_with_endpoint(&mock_server.uri());
2247 let backend = LakeFSBackend::new(config).unwrap();
2248
2249 Mock::given(method("POST"))
2250 .and(path("/api/v1/repositories"))
2251 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2252 "id": "my-engagement",
2253 "storage_namespace": "s3://my-bucket/engagement",
2254 "default_branch": "main",
2255 "creation_date": 1700000000
2256 })))
2257 .expect(1)
2258 .mount(&mock_server)
2259 .await;
2260
2261 let result = backend
2262 .create_repository("my-engagement", "s3://my-bucket/engagement", None)
2263 .await
2264 .unwrap();
2265
2266 assert_eq!(result.id, "my-engagement");
2267 assert_eq!(result.storage_namespace, "s3://my-bucket/engagement");
2268 assert_eq!(result.default_branch, "main");
2269 }
2270
2271 #[tokio::test]
2272 async fn test_create_repository_conflict() {
2273 let mock_server = MockServer::start().await;
2274 let config = create_test_config_with_endpoint(&mock_server.uri());
2275 let backend = LakeFSBackend::new(config).unwrap();
2276
2277 Mock::given(method("POST"))
2278 .and(path("/api/v1/repositories"))
2279 .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2280 "message": "repository already exists"
2281 })))
2282 .expect(1)
2283 .mount(&mock_server)
2284 .await;
2285
2286 let result = backend
2287 .create_repository("existing-repo", "s3://bucket", None)
2288 .await;
2289
2290 assert!(result.is_err());
2291 let err = result.unwrap_err();
2292 assert!(
2293 matches!(err, StorageError::ConnectionError(msg) if msg.contains("already exists"))
2294 );
2295 }
2296
2297 #[tokio::test]
2298 async fn test_create_repository_custom_branch() {
2299 let mock_server = MockServer::start().await;
2300 let config = create_test_config_with_endpoint(&mock_server.uri());
2301 let backend = LakeFSBackend::new(config).unwrap();
2302
2303 Mock::given(method("POST"))
2304 .and(path("/api/v1/repositories"))
2305 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2306 "id": "my-repo",
2307 "storage_namespace": "s3://bucket",
2308 "default_branch": "develop",
2309 "creation_date": 1700000000
2310 })))
2311 .expect(1)
2312 .mount(&mock_server)
2313 .await;
2314
2315 let result = backend
2316 .create_repository("my-repo", "s3://bucket", Some("develop"))
2317 .await
2318 .unwrap();
2319
2320 assert_eq!(result.default_branch, "develop");
2321 }
2322
2323 #[tokio::test]
2324 async fn test_delete_repository_success() {
2325 let mock_server = MockServer::start().await;
2326 let config = create_test_config_with_endpoint(&mock_server.uri());
2327 let backend = LakeFSBackend::new(config).unwrap();
2328
2329 Mock::given(method("DELETE"))
2330 .and(path("/api/v1/repositories/test-repo"))
2331 .respond_with(ResponseTemplate::new(204))
2332 .expect(1)
2333 .mount(&mock_server)
2334 .await;
2335
2336 backend.delete_repository("test-repo").await.unwrap();
2337 }
2338
2339 #[tokio::test]
2340 async fn test_delete_repository_not_found() {
2341 let mock_server = MockServer::start().await;
2342 let config = create_test_config_with_endpoint(&mock_server.uri());
2343 let backend = LakeFSBackend::new(config).unwrap();
2344
2345 Mock::given(method("DELETE"))
2346 .and(path("/api/v1/repositories/nonexistent"))
2347 .respond_with(ResponseTemplate::new(404))
2348 .expect(1)
2349 .mount(&mock_server)
2350 .await;
2351
2352 let result = backend.delete_repository("nonexistent").await;
2353 assert!(matches!(result, Err(StorageError::NotFound(_))));
2354 }
2355
2356 #[tokio::test]
2357 async fn test_create_branch_success() {
2358 let mock_server = MockServer::start().await;
2359 let config = create_test_config_with_endpoint(&mock_server.uri());
2360 let backend = LakeFSBackend::new(config).unwrap();
2361
2362 Mock::given(method("POST"))
2363 .and(path("/api/v1/repositories/briefcase-test/branches"))
2364 .respond_with(ResponseTemplate::new(201).set_body_string("\"abc123def456\""))
2365 .expect(1)
2366 .mount(&mock_server)
2367 .await;
2368
2369 let commit_id = backend.create_branch("feature-x", "main").await.unwrap();
2370 assert_eq!(commit_id, "abc123def456");
2371 }
2372
2373 #[tokio::test]
2374 async fn test_create_branch_conflict() {
2375 let mock_server = MockServer::start().await;
2376 let config = create_test_config_with_endpoint(&mock_server.uri());
2377 let backend = LakeFSBackend::new(config).unwrap();
2378
2379 Mock::given(method("POST"))
2380 .and(path("/api/v1/repositories/briefcase-test/branches"))
2381 .respond_with(ResponseTemplate::new(409))
2382 .expect(1)
2383 .mount(&mock_server)
2384 .await;
2385
2386 let result = backend.create_branch("existing-branch", "main").await;
2387 assert!(result.is_err());
2388 }
2389
2390 #[tokio::test]
2391 async fn test_delete_branch_success() {
2392 let mock_server = MockServer::start().await;
2393 let config = create_test_config_with_endpoint(&mock_server.uri());
2394 let backend = LakeFSBackend::new(config).unwrap();
2395
2396 Mock::given(method("DELETE"))
2397 .and(path(
2398 "/api/v1/repositories/briefcase-test/branches/feature-x",
2399 ))
2400 .respond_with(ResponseTemplate::new(204))
2401 .expect(1)
2402 .mount(&mock_server)
2403 .await;
2404
2405 backend.delete_branch("feature-x").await.unwrap();
2406 }
2407
2408 #[tokio::test]
2409 async fn test_delete_branch_not_found() {
2410 let mock_server = MockServer::start().await;
2411 let config = create_test_config_with_endpoint(&mock_server.uri());
2412 let backend = LakeFSBackend::new(config).unwrap();
2413
2414 Mock::given(method("DELETE"))
2415 .and(path(
2416 "/api/v1/repositories/briefcase-test/branches/nonexistent",
2417 ))
2418 .respond_with(ResponseTemplate::new(404))
2419 .expect(1)
2420 .mount(&mock_server)
2421 .await;
2422
2423 let result = backend.delete_branch("nonexistent").await;
2424 assert!(matches!(result, Err(StorageError::NotFound(_))));
2425 }
2426
2427 #[tokio::test]
2428 async fn test_merge_branch_success() {
2429 let mock_server = MockServer::start().await;
2430 let config = create_test_config_with_endpoint(&mock_server.uri());
2431 let backend = LakeFSBackend::new(config).unwrap();
2432
2433 Mock::given(method("POST"))
2434 .and(path(
2435 "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2436 ))
2437 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2438 "reference": "merge-commit-abc123",
2439 "summary": {"added": 3, "removed": 0, "changed": 1}
2440 })))
2441 .expect(1)
2442 .mount(&mock_server)
2443 .await;
2444
2445 let result = backend
2446 .merge_branch("feature-x", "main", Some("Merge feature-x"))
2447 .await
2448 .unwrap();
2449
2450 assert_eq!(result.commit_id, "merge-commit-abc123");
2451 assert!(result.summary.contains("added"));
2452 }
2453
2454 #[tokio::test]
2455 async fn test_merge_branch_conflict() {
2456 let mock_server = MockServer::start().await;
2457 let config = create_test_config_with_endpoint(&mock_server.uri());
2458 let backend = LakeFSBackend::new(config).unwrap();
2459
2460 Mock::given(method("POST"))
2461 .and(path(
2462 "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2463 ))
2464 .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2465 "message": "conflict"
2466 })))
2467 .expect(1)
2468 .mount(&mock_server)
2469 .await;
2470
2471 let result = backend.merge_branch("feature-x", "main", None).await;
2472 assert!(result.is_err());
2473 let err = result.unwrap_err();
2474 assert!(matches!(err, StorageError::ConnectionError(msg) if msg.contains("conflict")));
2475 }
2476
2477 #[tokio::test]
2478 async fn test_merge_branch_default_message() {
2479 let mock_server = MockServer::start().await;
2480 let config = create_test_config_with_endpoint(&mock_server.uri());
2481 let backend = LakeFSBackend::new(config).unwrap();
2482
2483 Mock::given(method("POST"))
2484 .and(path(
2485 "/api/v1/repositories/briefcase-test/refs/src/merge/dst",
2486 ))
2487 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2488 "reference": "commit-xyz",
2489 "summary": {}
2490 })))
2491 .expect(1)
2492 .mount(&mock_server)
2493 .await;
2494
2495 let result = backend.merge_branch("src", "dst", None).await.unwrap();
2496 assert_eq!(result.commit_id, "commit-xyz");
2497 }
2498
2499 #[tokio::test]
2500 async fn test_get_commit_success() {
2501 let mock_server = MockServer::start().await;
2502 let config = create_test_config_with_endpoint(&mock_server.uri());
2503 let backend = LakeFSBackend::new(config).unwrap();
2504
2505 Mock::given(method("GET"))
2506 .and(path("/api/v1/repositories/briefcase-test/commits/abc123"))
2507 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2508 "id": "abc123",
2509 "message": "Initial commit",
2510 "committer": "admin",
2511 "metadata": {"source": "briefcase-ai"},
2512 "creation_date": 1700000000
2513 })))
2514 .expect(1)
2515 .mount(&mock_server)
2516 .await;
2517
2518 let commit = backend.get_commit("abc123").await.unwrap();
2519 assert_eq!(commit.id, "abc123");
2520 assert_eq!(commit.message, "Initial commit");
2521 assert_eq!(commit.committer, "admin");
2522 assert_eq!(
2523 commit.metadata.get("source"),
2524 Some(&"briefcase-ai".to_string())
2525 );
2526 }
2527
2528 #[tokio::test]
2529 async fn test_get_commit_not_found() {
2530 let mock_server = MockServer::start().await;
2531 let config = create_test_config_with_endpoint(&mock_server.uri());
2532 let backend = LakeFSBackend::new(config).unwrap();
2533
2534 Mock::given(method("GET"))
2535 .and(path(
2536 "/api/v1/repositories/briefcase-test/commits/nonexistent",
2537 ))
2538 .respond_with(ResponseTemplate::new(404))
2539 .expect(1)
2540 .mount(&mock_server)
2541 .await;
2542
2543 let result = backend.get_commit("nonexistent").await;
2544 assert!(matches!(result, Err(StorageError::NotFound(_))));
2545 }
2546
2547 #[tokio::test]
2548 async fn test_diff_refs_success() {
2549 let mock_server = MockServer::start().await;
2550 let config = create_test_config_with_endpoint(&mock_server.uri());
2551 let backend = LakeFSBackend::new(config).unwrap();
2552
2553 Mock::given(method("GET"))
2554 .and(path(
2555 "/api/v1/repositories/briefcase-test/refs/main/diff/feature-x",
2556 ))
2557 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2558 "results": [
2559 {"path": "snapshots/a.json", "type": "added", "size_bytes": 1024},
2560 {"path": "snapshots/b.json", "type": "changed", "size_bytes": 2048},
2561 {"path": "decisions/old.json", "type": "removed"}
2562 ]
2563 })))
2564 .expect(1)
2565 .mount(&mock_server)
2566 .await;
2567
2568 let diffs = backend.diff_refs("main", "feature-x").await.unwrap();
2569 assert_eq!(diffs.len(), 3);
2570 assert_eq!(diffs[0].path, "snapshots/a.json");
2571 assert_eq!(diffs[0].diff_type, "added");
2572 assert_eq!(diffs[0].size_bytes, Some(1024));
2573 assert_eq!(diffs[1].diff_type, "changed");
2574 assert_eq!(diffs[2].diff_type, "removed");
2575 assert_eq!(diffs[2].size_bytes, None);
2576 }
2577
2578 #[tokio::test]
2579 async fn test_diff_refs_empty_result() {
2580 let mock_server = MockServer::start().await;
2581 let config = create_test_config_with_endpoint(&mock_server.uri());
2582 let backend = LakeFSBackend::new(config).unwrap();
2583
2584 Mock::given(method("GET"))
2585 .and(path(
2586 "/api/v1/repositories/briefcase-test/refs/main/diff/main",
2587 ))
2588 .respond_with(ResponseTemplate::new(200).set_body_json(json!({"results": []})))
2589 .expect(1)
2590 .mount(&mock_server)
2591 .await;
2592
2593 let diffs = backend.diff_refs("main", "main").await.unwrap();
2594 assert!(diffs.is_empty());
2595 }
2596
2597 #[tokio::test]
2598 async fn test_get_object_stats_success() {
2599 let mock_server = MockServer::start().await;
2600 let config = create_test_config_with_endpoint(&mock_server.uri());
2601 let backend = LakeFSBackend::new(config).unwrap();
2602
2603 Mock::given(method("GET"))
2604 .and(path(
2605 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2606 ))
2607 .and(query_param("path", "snapshots/test.json"))
2608 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2609 "path": "snapshots/test.json",
2610 "physical_address": "s3://bucket/data/abc",
2611 "size_bytes": 4096,
2612 "content_type": "application/json",
2613 "mtime": 1700000000,
2614 "checksum": "sha256:abcdef"
2615 })))
2616 .expect(1)
2617 .mount(&mock_server)
2618 .await;
2619
2620 let stats = backend
2621 .get_object_stats("snapshots/test.json")
2622 .await
2623 .unwrap();
2624
2625 assert_eq!(stats.path, "snapshots/test.json");
2626 assert_eq!(stats.size_bytes, 4096);
2627 assert_eq!(stats.content_type, "application/json");
2628 assert_eq!(stats.checksum, "sha256:abcdef");
2629 }
2630
2631 #[tokio::test]
2632 async fn test_get_object_stats_not_found() {
2633 let mock_server = MockServer::start().await;
2634 let config = create_test_config_with_endpoint(&mock_server.uri());
2635 let backend = LakeFSBackend::new(config).unwrap();
2636
2637 Mock::given(method("GET"))
2638 .and(path(
2639 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2640 ))
2641 .respond_with(ResponseTemplate::new(404))
2642 .expect(1)
2643 .mount(&mock_server)
2644 .await;
2645
2646 let result = backend.get_object_stats("nonexistent.json").await;
2647 assert!(matches!(result, Err(StorageError::NotFound(_))));
2648 }
2649
2650 #[tokio::test]
2651 async fn test_delete_object_success() {
2652 let mock_server = MockServer::start().await;
2653 let config = create_test_config_with_endpoint(&mock_server.uri());
2654 let backend = LakeFSBackend::new(config).unwrap();
2655
2656 Mock::given(method("DELETE"))
2657 .and(path(
2658 "/api/v1/repositories/briefcase-test/branches/main/objects",
2659 ))
2660 .and(query_param("path", "snapshots/old.json"))
2661 .respond_with(ResponseTemplate::new(204))
2662 .expect(1)
2663 .mount(&mock_server)
2664 .await;
2665
2666 backend.delete_object("snapshots/old.json").await.unwrap();
2667 }
2668
2669 #[tokio::test]
2670 async fn test_delete_object_not_found() {
2671 let mock_server = MockServer::start().await;
2672 let config = create_test_config_with_endpoint(&mock_server.uri());
2673 let backend = LakeFSBackend::new(config).unwrap();
2674
2675 Mock::given(method("DELETE"))
2676 .and(path(
2677 "/api/v1/repositories/briefcase-test/branches/main/objects",
2678 ))
2679 .respond_with(ResponseTemplate::new(404))
2680 .expect(1)
2681 .mount(&mock_server)
2682 .await;
2683
2684 let result = backend.delete_object("nonexistent.json").await;
2685 assert!(matches!(result, Err(StorageError::NotFound(_))));
2686 }
2687
2688 #[tokio::test]
2689 async fn test_upload_object_success() {
2690 let mock_server = MockServer::start().await;
2691 let config = create_test_config_with_endpoint(&mock_server.uri());
2692 let backend = LakeFSBackend::new(config).unwrap();
2693
2694 Mock::given(method("PUT"))
2695 .and(path(
2696 "/api/v1/repositories/briefcase-test/branches/main/objects",
2697 ))
2698 .and(query_param("path", "test/file.json"))
2699 .respond_with(ResponseTemplate::new(201))
2700 .expect(1)
2701 .mount(&mock_server)
2702 .await;
2703
2704 backend
2705 .upload_object("test/file.json", b"hello world")
2706 .await
2707 .unwrap();
2708 }
2709
2710 #[tokio::test]
2711 async fn test_upload_object_server_error() {
2712 let mock_server = MockServer::start().await;
2713 let config = create_test_config_with_endpoint(&mock_server.uri());
2714 let backend = LakeFSBackend::new(config).unwrap();
2715
2716 Mock::given(method("PUT"))
2717 .and(path(
2718 "/api/v1/repositories/briefcase-test/branches/main/objects",
2719 ))
2720 .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
2721 .expect(1)
2722 .mount(&mock_server)
2723 .await;
2724
2725 let result = backend.upload_object("test/file.json", b"data").await;
2726 assert!(result.is_err());
2727 }
2728
2729 #[tokio::test]
2730 async fn test_download_object_success() {
2731 let mock_server = MockServer::start().await;
2732 let config = create_test_config_with_endpoint(&mock_server.uri());
2733 let backend = LakeFSBackend::new(config).unwrap();
2734
2735 Mock::given(method("GET"))
2736 .and(path(
2737 "/api/v1/repositories/briefcase-test/refs/main/objects",
2738 ))
2739 .and(query_param("path", "test/file.json"))
2740 .respond_with(ResponseTemplate::new(200).set_body_bytes(b"file content here".to_vec()))
2741 .expect(1)
2742 .mount(&mock_server)
2743 .await;
2744
2745 let data = backend.download_object("test/file.json").await.unwrap();
2746 assert_eq!(data, b"file content here");
2747 }
2748
2749 #[tokio::test]
2750 async fn test_download_object_not_found() {
2751 let mock_server = MockServer::start().await;
2752 let config = create_test_config_with_endpoint(&mock_server.uri());
2753 let backend = LakeFSBackend::new(config).unwrap();
2754
2755 Mock::given(method("GET"))
2756 .and(path(
2757 "/api/v1/repositories/briefcase-test/refs/main/objects",
2758 ))
2759 .respond_with(ResponseTemplate::new(404))
2760 .expect(1)
2761 .mount(&mock_server)
2762 .await;
2763
2764 let result = backend.download_object("missing.json").await;
2765 assert!(matches!(result, Err(StorageError::NotFound(_))));
2766 }
2767
2768 #[tokio::test]
2769 async fn test_create_commit_success() {
2770 let mock_server = MockServer::start().await;
2771 let config = create_test_config_with_endpoint(&mock_server.uri());
2772 let backend = LakeFSBackend::new(config).unwrap();
2773
2774 Mock::given(method("POST"))
2775 .and(path(
2776 "/api/v1/repositories/briefcase-test/branches/main/commits",
2777 ))
2778 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2779 "id": "commit-sha-xyz"
2780 })))
2781 .expect(1)
2782 .mount(&mock_server)
2783 .await;
2784
2785 let id = backend.create_commit("Test commit").await.unwrap();
2786 assert_eq!(id, "commit-sha-xyz");
2787 }
2788
2789 #[tokio::test]
2790 async fn test_list_objects_success() {
2791 let mock_server = MockServer::start().await;
2792 let config = create_test_config_with_endpoint(&mock_server.uri());
2793 let backend = LakeFSBackend::new(config).unwrap();
2794
2795 Mock::given(method("GET"))
2796 .and(path(
2797 "/api/v1/repositories/briefcase-test/refs/main/objects/ls",
2798 ))
2799 .and(query_param("prefix", "snapshots/"))
2800 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2801 "results": [
2802 {"path": "snapshots/a.json", "type": "object"},
2803 {"path": "snapshots/b.json", "type": "object"},
2804 {"path": "snapshots/", "type": "common_prefix"}
2805 ]
2806 })))
2807 .expect(1)
2808 .mount(&mock_server)
2809 .await;
2810
2811 let paths = backend.list_objects("snapshots/").await.unwrap();
2812 assert_eq!(paths.len(), 2);
2813 assert!(paths.contains(&"snapshots/a.json".to_string()));
2814 assert!(paths.contains(&"snapshots/b.json".to_string()));
2815 }
2816
2817 #[tokio::test]
2818 async fn test_health_check_healthy() {
2819 let mock_server = MockServer::start().await;
2820 let config = create_test_config_with_endpoint(&mock_server.uri());
2821 let backend = LakeFSBackend::new(config).unwrap();
2822
2823 Mock::given(method("GET"))
2824 .and(path("/api/v1/repositories/briefcase-test"))
2825 .respond_with(ResponseTemplate::new(200))
2826 .expect(1)
2827 .mount(&mock_server)
2828 .await;
2829
2830 assert!(backend.health_check().await.unwrap());
2831 }
2832
2833 #[tokio::test]
2834 async fn test_health_check_unhealthy() {
2835 let mock_server = MockServer::start().await;
2836 let config = create_test_config_with_endpoint(&mock_server.uri());
2837 let backend = LakeFSBackend::new(config).unwrap();
2838
2839 Mock::given(method("GET"))
2840 .and(path("/api/v1/repositories/briefcase-test"))
2841 .respond_with(ResponseTemplate::new(503))
2842 .expect(1)
2843 .mount(&mock_server)
2844 .await;
2845
2846 assert!(!backend.health_check().await.unwrap());
2847 }
2848
2849 #[tokio::test]
2850 async fn test_delete_via_trait_success() {
2851 let mock_server = MockServer::start().await;
2852 let config = create_test_config_with_endpoint(&mock_server.uri());
2853 let backend = LakeFSBackend::new(config).unwrap();
2854
2855 Mock::given(method("DELETE"))
2856 .and(path(
2857 "/api/v1/repositories/briefcase-test/branches/main/objects",
2858 ))
2859 .respond_with(ResponseTemplate::new(204))
2860 .expect(1)
2861 .mount(&mock_server)
2862 .await;
2863
2864 let deleted = backend.delete("some-snapshot-id").await.unwrap();
2865 assert!(deleted);
2866 }
2867
2868 #[tokio::test]
2869 async fn test_delete_via_trait_not_found() {
2870 let mock_server = MockServer::start().await;
2871 let config = create_test_config_with_endpoint(&mock_server.uri());
2872 let backend = LakeFSBackend::new(config).unwrap();
2873
2874 Mock::given(method("DELETE"))
2875 .and(path(
2876 "/api/v1/repositories/briefcase-test/branches/main/objects",
2877 ))
2878 .respond_with(ResponseTemplate::new(404))
2879 .expect(1)
2880 .mount(&mock_server)
2881 .await;
2882
2883 let deleted = backend.delete("nonexistent").await.unwrap();
2884 assert!(!deleted);
2885 }
2886
2887 #[test]
2893 fn test_lakefs_auth_enum_basic() {
2894 let auth = LakeFSAuth::Basic {
2895 access_key: "ak".into(),
2896 secret_key: "sk".into(),
2897 };
2898 match auth {
2899 LakeFSAuth::Basic {
2900 access_key,
2901 secret_key,
2902 } => {
2903 assert_eq!(access_key, "ak");
2904 assert_eq!(secret_key, "sk");
2905 }
2906 _ => panic!("Expected Basic"),
2907 }
2908 }
2909
2910 #[test]
2911 fn test_lakefs_auth_enum_all_variants() {
2912 let _basic = LakeFSAuth::Basic {
2914 access_key: "a".into(),
2915 secret_key: "s".into(),
2916 };
2917 let _token = LakeFSAuth::Token {
2918 access_key: "a".into(),
2919 secret_key: "s".into(),
2920 };
2921 let _sts = LakeFSAuth::Sts {
2922 oidc_token: "tok".into(),
2923 };
2924 let _iam = LakeFSAuth::Iam;
2925 let _oidc = LakeFSAuth::Oidc {
2926 token: "tok".into(),
2927 };
2928 let _saml = LakeFSAuth::Saml {
2929 assertion: "xml".into(),
2930 };
2931 }
2932
2933 #[test]
2934 fn test_lakefs_config_with_auth() {
2935 let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk").with_auth(
2936 LakeFSAuth::Token {
2937 access_key: "ak2".into(),
2938 secret_key: "sk2".into(),
2939 },
2940 );
2941 assert!(config.auth.is_some());
2942 match config.auth.unwrap() {
2943 LakeFSAuth::Token {
2944 access_key,
2945 secret_key,
2946 } => {
2947 assert_eq!(access_key, "ak2");
2948 assert_eq!(secret_key, "sk2");
2949 }
2950 _ => panic!("Expected Token"),
2951 }
2952 }
2953
2954 #[test]
2955 fn test_lakefs_config_without_auth_defaults_to_none() {
2956 let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk");
2957 assert!(config.auth.is_none());
2958 }
2959
2960 #[tokio::test]
2961 async fn test_basic_auth_provider_header() {
2962 let provider = BasicAuthProvider::new(
2963 "AKIAIOSFODNN7EXAMPLE",
2964 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
2965 )
2966 .unwrap();
2967 let header = provider.auth_header().await.unwrap();
2968 let header_str = header.to_str().unwrap();
2969 assert!(header_str.starts_with("Basic "));
2970 let b64_part = &header_str["Basic ".len()..];
2972 let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
2973 let decoded_str = String::from_utf8(decoded).unwrap();
2974 assert_eq!(
2975 decoded_str,
2976 "AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
2977 );
2978 }
2979
2980 #[tokio::test]
2981 async fn test_basic_auth_provider_mode_name() {
2982 let provider = BasicAuthProvider::new("ak", "sk").unwrap();
2983 assert_eq!(provider.mode_name(), "basic");
2984 }
2985
2986 #[tokio::test]
2987 async fn test_token_auth_provider_login_success() {
2988 let mock_server = MockServer::start().await;
2989
2990 Mock::given(method("POST"))
2992 .and(path("/api/v1/auth/login"))
2993 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2994 "token": "eyJhbGciOiJIUzI1NiJ9.test",
2995 "token_expiration": 9999999999u64
2996 })))
2997 .expect(1)
2998 .mount(&mock_server)
2999 .await;
3000
3001 let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3002 let header = provider.auth_header().await.unwrap();
3003 let header_str = header.to_str().unwrap();
3004 assert_eq!(header_str, "Bearer eyJhbGciOiJIUzI1NiJ9.test");
3005 assert_eq!(provider.mode_name(), "token");
3006 }
3007
3008 #[tokio::test]
3009 async fn test_token_auth_provider_caches_token() {
3010 let mock_server = MockServer::start().await;
3011
3012 Mock::given(method("POST"))
3013 .and(path("/api/v1/auth/login"))
3014 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3015 "token": "cached-token",
3016 "token_expiration": 9999999999u64
3017 })))
3018 .expect(1) .mount(&mock_server)
3020 .await;
3021
3022 let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3023
3024 let h1 = provider.auth_header().await.unwrap();
3026 let h2 = provider.auth_header().await.unwrap();
3028 assert_eq!(h1, h2);
3029 }
3030
3031 #[tokio::test]
3032 async fn test_token_auth_provider_login_failure() {
3033 let mock_server = MockServer::start().await;
3034
3035 Mock::given(method("POST"))
3036 .and(path("/api/v1/auth/login"))
3037 .respond_with(ResponseTemplate::new(401).set_body_json(json!({
3038 "message": "invalid credentials"
3039 })))
3040 .expect(1)
3041 .mount(&mock_server)
3042 .await;
3043
3044 let provider = TokenAuthProvider::new(&mock_server.uri(), "bad", "creds");
3045 let result = provider.auth_header().await;
3046 assert!(result.is_err());
3047 let err_msg = format!("{}", result.unwrap_err());
3048 assert!(err_msg.contains("401"));
3049 }
3050
3051 #[tokio::test]
3052 async fn test_sts_auth_provider_success() {
3053 let mock_server = MockServer::start().await;
3054
3055 Mock::given(method("POST"))
3056 .and(path("/sts/login"))
3057 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3058 "Credentials": {
3059 "AccessKeyId": "ASIATEMP",
3060 "SecretAccessKey": "tempSecret",
3061 "SessionToken": "FwoGZXIvY...",
3062 "Expiration": 9999999999u64
3063 }
3064 })))
3065 .expect(1)
3066 .mount(&mock_server)
3067 .await;
3068
3069 let provider = StsAuthProvider::new(&mock_server.uri(), "my-oidc-token");
3070 let header = provider.auth_header().await.unwrap();
3071 let header_str = header.to_str().unwrap();
3072 assert!(header_str.starts_with("Basic "));
3073 let b64_part = &header_str["Basic ".len()..];
3075 let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3076 let decoded_str = String::from_utf8(decoded).unwrap();
3077 assert_eq!(decoded_str, "ASIATEMP:tempSecret");
3078 assert_eq!(provider.mode_name(), "sts");
3079 }
3080
3081 #[tokio::test]
3082 async fn test_oidc_auth_provider_success() {
3083 let mock_server = MockServer::start().await;
3084
3085 Mock::given(method("POST"))
3086 .and(path("/api/v1/oidc/login"))
3087 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3088 "token": "oidc-lakefs-jwt-token",
3089 "token_expiration": 9999999999u64
3090 })))
3091 .expect(1)
3092 .mount(&mock_server)
3093 .await;
3094
3095 let provider = OidcAuthProvider::new(&mock_server.uri(), "external-oidc-token");
3096 let header = provider.auth_header().await.unwrap();
3097 let header_str = header.to_str().unwrap();
3098 assert_eq!(header_str, "Bearer oidc-lakefs-jwt-token");
3099 assert_eq!(provider.mode_name(), "oidc");
3100 }
3101
3102 #[tokio::test]
3103 async fn test_saml_auth_provider_success() {
3104 let mock_server = MockServer::start().await;
3105
3106 Mock::given(method("POST"))
3107 .and(path("/api/v1/auth/external/saml"))
3108 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3109 "token": "saml-lakefs-jwt-token",
3110 "token_expiration": 9999999999u64
3111 })))
3112 .expect(1)
3113 .mount(&mock_server)
3114 .await;
3115
3116 let provider = SamlAuthProvider::new(&mock_server.uri(), "PHNhbWxSZXNwb25zZT4=");
3117 let header = provider.auth_header().await.unwrap();
3118 let header_str = header.to_str().unwrap();
3119 assert_eq!(header_str, "Bearer saml-lakefs-jwt-token");
3120 assert_eq!(provider.mode_name(), "saml");
3121 }
3122
3123 #[tokio::test]
3124 async fn test_iam_auth_provider_mode_name() {
3125 let provider = IamAuthProvider::new();
3126 assert_eq!(provider.mode_name(), "iam");
3127 }
3128
3129 #[tokio::test]
3130 async fn test_build_auth_provider_basic() {
3131 let auth = LakeFSAuth::Basic {
3132 access_key: "ak".into(),
3133 secret_key: "sk".into(),
3134 };
3135 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3136 assert_eq!(provider.mode_name(), "basic");
3137 let header = provider.auth_header().await.unwrap();
3138 assert!(header.to_str().unwrap().starts_with("Basic "));
3139 }
3140
3141 #[tokio::test]
3142 async fn test_build_auth_provider_token() {
3143 let auth = LakeFSAuth::Token {
3144 access_key: "ak".into(),
3145 secret_key: "sk".into(),
3146 };
3147 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3148 assert_eq!(provider.mode_name(), "token");
3149 }
3150
3151 #[tokio::test]
3152 async fn test_build_auth_provider_iam() {
3153 let auth = LakeFSAuth::Iam;
3154 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3155 assert_eq!(provider.mode_name(), "iam");
3156 }
3157
3158 #[tokio::test]
3159 async fn test_backend_with_basic_auth_config() {
3160 let mock_server = MockServer::start().await;
3161
3162 let config = LakeFSConfig::new(
3163 mock_server.uri(),
3164 "test-repo",
3165 "main",
3166 "test_key",
3167 "test_secret",
3168 )
3169 .with_auth(LakeFSAuth::Basic {
3170 access_key: "test_key".into(),
3171 secret_key: "test_secret".into(),
3172 });
3173
3174 let backend = LakeFSBackend::new(config).unwrap();
3175 assert_eq!(backend.auth_mode(), "basic");
3176 }
3177
3178 #[tokio::test]
3179 async fn test_backend_with_token_auth_sends_bearer() {
3180 let mock_server = MockServer::start().await;
3181
3182 Mock::given(method("POST"))
3184 .and(path("/api/v1/auth/login"))
3185 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3186 "token": "test-jwt-token",
3187 "token_expiration": 9999999999u64
3188 })))
3189 .mount(&mock_server)
3190 .await;
3191
3192 Mock::given(method("GET"))
3194 .and(path("/api/v1/repositories/test-repo"))
3195 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3196 "id": "test-repo",
3197 "storage_namespace": "s3://test",
3198 "default_branch": "main"
3199 })))
3200 .mount(&mock_server)
3201 .await;
3202
3203 let config = LakeFSConfig::new(mock_server.uri(), "test-repo", "main", "ak", "sk")
3204 .with_auth(LakeFSAuth::Token {
3205 access_key: "ak".into(),
3206 secret_key: "sk".into(),
3207 });
3208
3209 let backend = LakeFSBackend::new(config).unwrap();
3210 assert_eq!(backend.auth_mode(), "token");
3211
3212 let health = backend.health_check().await.unwrap();
3214 assert!(health);
3215 }
3216
3217 #[tokio::test]
3218 async fn test_backend_default_auth_is_basic() {
3219 let config = LakeFSConfig::new("http://localhost:8000", "test-repo", "main", "ak", "sk");
3220 let backend = LakeFSBackend::new(config).unwrap();
3222 assert_eq!(backend.auth_mode(), "basic");
3223 }
3224}