1use super::{FlushResult, SnapshotQuery, StorageBackend, StorageError};
2use crate::models::{DecisionSnapshot, Snapshot};
3#[cfg(feature = "networking")]
4use base64::{engine::general_purpose, Engine as _};
5#[cfg(feature = "networking")]
6use reqwest::{
7 header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE},
8 Client, RequestBuilder, Response,
9};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14#[derive(Debug, Clone)]
25pub enum LakeFSAuth {
26 Basic {
29 access_key: String,
30 secret_key: String,
31 },
32
33 Token {
37 access_key: String,
38 secret_key: String,
39 },
40
41 Sts {
45 oidc_token: String,
47 },
48
49 Iam,
53
54 Oidc {
58 token: String,
60 },
61
62 Saml {
66 assertion: String,
68 },
69}
70
71#[cfg(all(feature = "async", feature = "networking"))]
79#[async_trait::async_trait]
80pub(crate) trait AuthProvider: Send + Sync {
81 async fn auth_header(&self) -> Result<HeaderValue, StorageError>;
86
87 fn mode_name(&self) -> &'static str;
89}
90
91#[cfg(all(feature = "async", feature = "networking"))]
94pub(crate) struct BasicAuthProvider {
95 header: HeaderValue,
96}
97
98#[cfg(all(feature = "async", feature = "networking"))]
99impl BasicAuthProvider {
100 pub fn new(access_key: &str, secret_key: &str) -> Result<Self, StorageError> {
101 let credentials = format!("{}:{}", access_key, secret_key);
102 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
103 let header_str = format!("Basic {}", encoded);
104 let header = HeaderValue::from_str(&header_str).map_err(|e| {
105 StorageError::ConnectionError(format!("Invalid Basic auth header: {}", e))
106 })?;
107 Ok(Self { header })
108 }
109}
110
111#[cfg(all(feature = "async", feature = "networking"))]
112#[async_trait::async_trait]
113impl AuthProvider for BasicAuthProvider {
114 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
115 Ok(self.header.clone())
116 }
117 fn mode_name(&self) -> &'static str {
118 "basic"
119 }
120}
121
122#[cfg(all(feature = "async", feature = "networking"))]
125pub(crate) struct TokenAuthProvider {
126 endpoint: String,
127 access_key: String,
128 secret_key: String,
129 client: Client,
130 cache: Mutex<Option<(String, u64)>>,
132}
133
134#[cfg(all(feature = "async", feature = "networking"))]
135impl TokenAuthProvider {
136 pub fn new(endpoint: &str, access_key: &str, secret_key: &str) -> Self {
137 let client = Client::builder()
138 .timeout(std::time::Duration::from_secs(10))
139 .build()
140 .unwrap_or_default();
141 Self {
142 endpoint: endpoint.trim_end_matches('/').to_string(),
143 access_key: access_key.to_string(),
144 secret_key: secret_key.to_string(),
145 client,
146 cache: Mutex::new(None),
147 }
148 }
149
150 async fn fetch_token(&self) -> Result<(String, u64), StorageError> {
152 #[derive(Serialize)]
153 struct LoginRequest {
154 access_key_id: String,
155 secret_access_key: String,
156 }
157
158 #[derive(Deserialize)]
159 struct LoginResponse {
160 token: String,
161 #[serde(default)]
162 token_expiration: Option<u64>,
163 }
164
165 let url = format!("{}/api/v1/auth/login", self.endpoint);
166 let body = LoginRequest {
167 access_key_id: self.access_key.clone(),
168 secret_access_key: self.secret_key.clone(),
169 };
170
171 let resp = self
172 .client
173 .post(&url)
174 .json(&body)
175 .send()
176 .await
177 .map_err(|e| StorageError::ConnectionError(format!("Token login failed: {}", e)))?;
178
179 if !resp.status().is_success() {
180 let status = resp.status();
181 let text = resp.text().await.unwrap_or_default();
182 return Err(StorageError::ConnectionError(format!(
183 "Token login returned {}: {}",
184 status, text
185 )));
186 }
187
188 let login: LoginResponse = resp.json().await.map_err(|e| {
189 StorageError::ConnectionError(format!("Failed to parse login response: {}", e))
190 })?;
191
192 let now_secs = std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_secs();
197 let expiry = login.token_expiration.unwrap_or(now_secs + 3600);
198
199 Ok((login.token, expiry))
200 }
201}
202
203#[cfg(all(feature = "async", feature = "networking"))]
204#[async_trait::async_trait]
205impl AuthProvider for TokenAuthProvider {
206 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
207 let needs_refresh = {
209 let cache = self.cache.lock().unwrap();
210 match &*cache {
211 Some((_token, expiry)) => {
212 let now = std::time::SystemTime::now()
213 .duration_since(std::time::UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_secs();
216 now + 60 >= *expiry
217 }
218 None => true,
219 }
220 };
221
222 if needs_refresh {
223 let (token, expiry) = self.fetch_token().await?;
224 let mut cache = self.cache.lock().unwrap();
225 *cache = Some((token, expiry));
226 }
227
228 let cache = self.cache.lock().unwrap();
229 let (token, _) = cache.as_ref().unwrap();
230 let header_str = format!("Bearer {}", token);
231 HeaderValue::from_str(&header_str)
232 .map_err(|e| StorageError::ConnectionError(format!("Invalid Bearer header: {}", e)))
233 }
234
235 fn mode_name(&self) -> &'static str {
236 "token"
237 }
238}
239
240#[cfg(all(feature = "async", feature = "networking"))]
243pub(crate) struct StsAuthProvider {
244 endpoint: String,
245 oidc_token: String,
246 client: Client,
247 cache: Mutex<Option<(String, String, String, u64)>>,
249}
250
251#[cfg(all(feature = "async", feature = "networking"))]
252impl StsAuthProvider {
253 pub fn new(endpoint: &str, oidc_token: &str) -> Self {
254 let client = Client::builder()
255 .timeout(std::time::Duration::from_secs(10))
256 .build()
257 .unwrap_or_default();
258 Self {
259 endpoint: endpoint.trim_end_matches('/').to_string(),
260 oidc_token: oidc_token.to_string(),
261 client,
262 cache: Mutex::new(None),
263 }
264 }
265
266 async fn fetch_temp_credentials(&self) -> Result<(String, String, String, u64), StorageError> {
267 #[derive(Deserialize)]
268 struct StsResponse {
269 #[serde(rename = "Credentials")]
270 credentials: StsCredentials,
271 }
272
273 #[derive(Deserialize)]
274 struct StsCredentials {
275 #[serde(rename = "AccessKeyId")]
276 access_key_id: String,
277 #[serde(rename = "SecretAccessKey")]
278 secret_access_key: String,
279 #[serde(rename = "SessionToken")]
280 session_token: String,
281 #[serde(rename = "Expiration", default)]
282 expiration: Option<u64>,
283 }
284
285 let url = format!("{}/sts/login", self.endpoint);
286
287 let resp = self
288 .client
289 .post(&url)
290 .header("Content-Type", "application/x-www-form-urlencoded")
291 .body(format!(
292 "Action=AssumeRoleWithWebIdentity&WebIdentityToken={}&Version=2011-06-15",
293 self.oidc_token
294 ))
295 .send()
296 .await
297 .map_err(|e| StorageError::ConnectionError(format!("STS login failed: {}", e)))?;
298
299 if !resp.status().is_success() {
300 let status = resp.status();
301 let text = resp.text().await.unwrap_or_default();
302 return Err(StorageError::ConnectionError(format!(
303 "STS login returned {}: {}",
304 status, text
305 )));
306 }
307
308 let sts: StsResponse = resp.json().await.map_err(|e| {
309 StorageError::ConnectionError(format!("Failed to parse STS response: {}", e))
310 })?;
311
312 let now_secs = std::time::SystemTime::now()
313 .duration_since(std::time::UNIX_EPOCH)
314 .unwrap_or_default()
315 .as_secs();
316 let expiry = sts.credentials.expiration.unwrap_or(now_secs + 3600);
317
318 Ok((
319 sts.credentials.access_key_id,
320 sts.credentials.secret_access_key,
321 sts.credentials.session_token,
322 expiry,
323 ))
324 }
325}
326
327#[cfg(all(feature = "async", feature = "networking"))]
328#[async_trait::async_trait]
329impl AuthProvider for StsAuthProvider {
330 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
331 let needs_refresh = {
332 let cache = self.cache.lock().unwrap();
333 match &*cache {
334 Some((_, _, _, expiry)) => {
335 let now = std::time::SystemTime::now()
336 .duration_since(std::time::UNIX_EPOCH)
337 .unwrap_or_default()
338 .as_secs();
339 now + 60 >= *expiry
340 }
341 None => true,
342 }
343 };
344
345 if needs_refresh {
346 let creds = self.fetch_temp_credentials().await?;
347 let mut cache = self.cache.lock().unwrap();
348 *cache = Some(creds);
349 }
350
351 let cache = self.cache.lock().unwrap();
355 let (ak, sk, _session_token, _) = cache.as_ref().unwrap();
356 let credentials = format!("{}:{}", ak, sk);
357 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
358 let header_str = format!("Basic {}", encoded);
359 HeaderValue::from_str(&header_str)
360 .map_err(|e| StorageError::ConnectionError(format!("Invalid STS auth header: {}", e)))
361 }
362
363 fn mode_name(&self) -> &'static str {
364 "sts"
365 }
366}
367
368#[cfg(all(feature = "async", feature = "networking"))]
371pub(crate) struct IamAuthProvider {
372 client: Client,
373 cache: Mutex<Option<IamCredentialCache>>,
375}
376
377#[cfg(all(feature = "async", feature = "networking"))]
378type IamCredentialCache = (String, String, Option<String>, u64);
379
380#[cfg(all(feature = "async", feature = "networking"))]
381impl IamAuthProvider {
382 pub fn new() -> Self {
383 let client = Client::builder()
384 .timeout(std::time::Duration::from_secs(5))
385 .build()
386 .unwrap_or_default();
387 Self {
388 client,
389 cache: Mutex::new(None),
390 }
391 }
392
393 async fn fetch_credentials(&self) -> Result<IamCredentialCache, StorageError> {
395 if let Ok(creds_uri) = std::env::var("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") {
397 let url = format!("http://169.254.170.2{}", creds_uri);
398 if let Ok(resp) = self.client.get(&url).send().await {
399 if resp.status().is_success() {
400 if let Ok(creds) = resp.json::<IamCredentials>().await {
401 let now_secs = std::time::SystemTime::now()
402 .duration_since(std::time::UNIX_EPOCH)
403 .unwrap_or_default()
404 .as_secs();
405 return Ok((
406 creds.access_key_id,
407 creds.secret_access_key,
408 creds.token,
409 now_secs + 3600,
410 ));
411 }
412 }
413 }
414 }
415
416 let token_resp = self
418 .client
419 .put("http://169.254.169.254/latest/api/token")
420 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421 .send()
422 .await
423 .map_err(|e| {
424 StorageError::ConnectionError(format!("IAM: Failed to fetch IMDS token: {}", e))
425 })?;
426
427 let imds_token = token_resp.text().await.map_err(|e| {
428 StorageError::ConnectionError(format!("IAM: Failed to read IMDS token: {}", e))
429 })?;
430
431 let role_resp = self
433 .client
434 .get("http://169.254.169.254/latest/meta-data/iam/security-credentials/")
435 .header("X-aws-ec2-metadata-token", &imds_token)
436 .send()
437 .await
438 .map_err(|e| {
439 StorageError::ConnectionError(format!("IAM: Failed to fetch role name: {}", e))
440 })?;
441
442 let role_name = role_resp.text().await.map_err(|e| {
443 StorageError::ConnectionError(format!("IAM: Failed to read role name: {}", e))
444 })?;
445 let role_name = role_name.trim();
446
447 let creds_url = format!(
449 "http://169.254.169.254/latest/meta-data/iam/security-credentials/{}",
450 role_name
451 );
452 let creds_resp = self
453 .client
454 .get(&creds_url)
455 .header("X-aws-ec2-metadata-token", &imds_token)
456 .send()
457 .await
458 .map_err(|e| {
459 StorageError::ConnectionError(format!("IAM: Failed to fetch credentials: {}", e))
460 })?;
461
462 let creds: IamCredentials = creds_resp.json().await.map_err(|e| {
463 StorageError::ConnectionError(format!("IAM: Failed to parse credentials: {}", e))
464 })?;
465
466 let now_secs = std::time::SystemTime::now()
467 .duration_since(std::time::UNIX_EPOCH)
468 .unwrap_or_default()
469 .as_secs();
470
471 Ok((
472 creds.access_key_id,
473 creds.secret_access_key,
474 creds.token,
475 now_secs + 3600,
476 ))
477 }
478}
479
480#[cfg(all(feature = "async", feature = "networking"))]
481#[derive(Deserialize)]
482struct IamCredentials {
483 #[serde(rename = "AccessKeyId", alias = "accessKeyId")]
484 access_key_id: String,
485 #[serde(rename = "SecretAccessKey", alias = "secretAccessKey")]
486 secret_access_key: String,
487 #[serde(rename = "Token", alias = "token", alias = "SessionToken", default)]
488 token: Option<String>,
489}
490
491#[cfg(all(feature = "async", feature = "networking"))]
492#[async_trait::async_trait]
493impl AuthProvider for IamAuthProvider {
494 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
495 let needs_refresh = {
496 let cache = self.cache.lock().unwrap();
497 match &*cache {
498 Some((_, _, _, expiry)) => {
499 let now = std::time::SystemTime::now()
500 .duration_since(std::time::UNIX_EPOCH)
501 .unwrap_or_default()
502 .as_secs();
503 now + 60 >= *expiry
504 }
505 None => true,
506 }
507 };
508
509 if needs_refresh {
510 let creds = self.fetch_credentials().await?;
511 let mut cache = self.cache.lock().unwrap();
512 *cache = Some(creds);
513 }
514
515 let cache = self.cache.lock().unwrap();
516 let (ak, sk, _, _) = cache.as_ref().unwrap();
517 let credentials = format!("{}:{}", ak, sk);
518 let encoded = general_purpose::STANDARD.encode(credentials.as_bytes());
519 let header_str = format!("Basic {}", encoded);
520 HeaderValue::from_str(&header_str)
521 .map_err(|e| StorageError::ConnectionError(format!("Invalid IAM auth header: {}", e)))
522 }
523
524 fn mode_name(&self) -> &'static str {
525 "iam"
526 }
527}
528
529#[cfg(all(feature = "async", feature = "networking"))]
532pub(crate) struct OidcAuthProvider {
533 endpoint: String,
534 external_token: String,
535 client: Client,
536 cache: Mutex<Option<(String, u64)>>,
538}
539
540#[cfg(all(feature = "async", feature = "networking"))]
541impl OidcAuthProvider {
542 pub fn new(endpoint: &str, external_token: &str) -> Self {
543 let client = Client::builder()
544 .timeout(std::time::Duration::from_secs(10))
545 .build()
546 .unwrap_or_default();
547 Self {
548 endpoint: endpoint.trim_end_matches('/').to_string(),
549 external_token: external_token.to_string(),
550 client,
551 cache: Mutex::new(None),
552 }
553 }
554
555 async fn exchange_token(&self) -> Result<(String, u64), StorageError> {
556 #[derive(Deserialize)]
557 struct OidcResponse {
558 token: String,
559 #[serde(default)]
560 token_expiration: Option<u64>,
561 }
562
563 let url = format!("{}/api/v1/oidc/login", self.endpoint);
564
565 let resp = self
566 .client
567 .post(&url)
568 .header("Content-Type", "application/x-www-form-urlencoded")
569 .body(format!("code={}", self.external_token))
570 .send()
571 .await
572 .map_err(|e| StorageError::ConnectionError(format!("OIDC login failed: {}", e)))?;
573
574 if !resp.status().is_success() {
575 let status = resp.status();
576 let text = resp.text().await.unwrap_or_default();
577 return Err(StorageError::ConnectionError(format!(
578 "OIDC login returned {}: {}",
579 status, text
580 )));
581 }
582
583 let oidc: OidcResponse = resp.json().await.map_err(|e| {
584 StorageError::ConnectionError(format!("Failed to parse OIDC response: {}", e))
585 })?;
586
587 let now_secs = std::time::SystemTime::now()
588 .duration_since(std::time::UNIX_EPOCH)
589 .unwrap_or_default()
590 .as_secs();
591 let expiry = oidc.token_expiration.unwrap_or(now_secs + 3600);
592
593 Ok((oidc.token, expiry))
594 }
595}
596
597#[cfg(all(feature = "async", feature = "networking"))]
598#[async_trait::async_trait]
599impl AuthProvider for OidcAuthProvider {
600 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
601 let needs_refresh = {
602 let cache = self.cache.lock().unwrap();
603 match &*cache {
604 Some((_token, expiry)) => {
605 let now = std::time::SystemTime::now()
606 .duration_since(std::time::UNIX_EPOCH)
607 .unwrap_or_default()
608 .as_secs();
609 now + 60 >= *expiry
610 }
611 None => true,
612 }
613 };
614
615 if needs_refresh {
616 let (token, expiry) = self.exchange_token().await?;
617 let mut cache = self.cache.lock().unwrap();
618 *cache = Some((token, expiry));
619 }
620
621 let cache = self.cache.lock().unwrap();
622 let (token, _) = cache.as_ref().unwrap();
623 let header_str = format!("Bearer {}", token);
624 HeaderValue::from_str(&header_str).map_err(|e| {
625 StorageError::ConnectionError(format!("Invalid OIDC Bearer header: {}", e))
626 })
627 }
628
629 fn mode_name(&self) -> &'static str {
630 "oidc"
631 }
632}
633
634#[cfg(all(feature = "async", feature = "networking"))]
637pub(crate) struct SamlAuthProvider {
638 endpoint: String,
639 assertion: String,
640 client: Client,
641 cache: Mutex<Option<(String, u64)>>,
643}
644
645#[cfg(all(feature = "async", feature = "networking"))]
646impl SamlAuthProvider {
647 pub fn new(endpoint: &str, assertion: &str) -> Self {
648 let client = Client::builder()
649 .timeout(std::time::Duration::from_secs(10))
650 .build()
651 .unwrap_or_default();
652 Self {
653 endpoint: endpoint.trim_end_matches('/').to_string(),
654 assertion: assertion.to_string(),
655 client,
656 cache: Mutex::new(None),
657 }
658 }
659
660 async fn exchange_assertion(&self) -> Result<(String, u64), StorageError> {
661 #[derive(Deserialize)]
662 struct SamlResponse {
663 token: String,
664 #[serde(default)]
665 token_expiration: Option<u64>,
666 }
667
668 let url = format!("{}/api/v1/auth/external/saml", self.endpoint);
669
670 let resp = self
671 .client
672 .post(&url)
673 .header("Content-Type", "application/x-www-form-urlencoded")
674 .body(format!("SAMLResponse={}", self.assertion))
675 .send()
676 .await
677 .map_err(|e| StorageError::ConnectionError(format!("SAML login failed: {}", e)))?;
678
679 if !resp.status().is_success() {
680 let status = resp.status();
681 let text = resp.text().await.unwrap_or_default();
682 return Err(StorageError::ConnectionError(format!(
683 "SAML login returned {}: {}",
684 status, text
685 )));
686 }
687
688 let saml: SamlResponse = resp.json().await.map_err(|e| {
689 StorageError::ConnectionError(format!("Failed to parse SAML response: {}", e))
690 })?;
691
692 let now_secs = std::time::SystemTime::now()
693 .duration_since(std::time::UNIX_EPOCH)
694 .unwrap_or_default()
695 .as_secs();
696 let expiry = saml.token_expiration.unwrap_or(now_secs + 3600);
697
698 Ok((saml.token, expiry))
699 }
700}
701
702#[cfg(all(feature = "async", feature = "networking"))]
703#[async_trait::async_trait]
704impl AuthProvider for SamlAuthProvider {
705 async fn auth_header(&self) -> Result<HeaderValue, StorageError> {
706 let needs_refresh = {
707 let cache = self.cache.lock().unwrap();
708 match &*cache {
709 Some((_token, expiry)) => {
710 let now = std::time::SystemTime::now()
711 .duration_since(std::time::UNIX_EPOCH)
712 .unwrap_or_default()
713 .as_secs();
714 now + 60 >= *expiry
715 }
716 None => true,
717 }
718 };
719
720 if needs_refresh {
721 let (token, expiry) = self.exchange_assertion().await?;
722 let mut cache = self.cache.lock().unwrap();
723 *cache = Some((token, expiry));
724 }
725
726 let cache = self.cache.lock().unwrap();
727 let (token, _) = cache.as_ref().unwrap();
728 let header_str = format!("Bearer {}", token);
729 HeaderValue::from_str(&header_str).map_err(|e| {
730 StorageError::ConnectionError(format!("Invalid SAML Bearer header: {}", e))
731 })
732 }
733
734 fn mode_name(&self) -> &'static str {
735 "saml"
736 }
737}
738
739#[cfg(all(feature = "async", feature = "networking"))]
742fn build_auth_provider(
743 auth: &LakeFSAuth,
744 endpoint: &str,
745) -> Result<Arc<dyn AuthProvider>, StorageError> {
746 match auth {
747 LakeFSAuth::Basic {
748 access_key,
749 secret_key,
750 } => Ok(Arc::new(BasicAuthProvider::new(access_key, secret_key)?)),
751 LakeFSAuth::Token {
752 access_key,
753 secret_key,
754 } => Ok(Arc::new(TokenAuthProvider::new(
755 endpoint, access_key, secret_key,
756 ))),
757 LakeFSAuth::Sts { oidc_token } => Ok(Arc::new(StsAuthProvider::new(endpoint, oidc_token))),
758 LakeFSAuth::Iam => Ok(Arc::new(IamAuthProvider::new())),
759 LakeFSAuth::Oidc { token } => Ok(Arc::new(OidcAuthProvider::new(endpoint, token))),
760 LakeFSAuth::Saml { assertion } => Ok(Arc::new(SamlAuthProvider::new(endpoint, assertion))),
761 }
762}
763
764#[derive(Debug, Clone, Serialize, Deserialize)]
768pub struct MergeResult {
769 pub commit_id: String,
771 pub summary: String,
773}
774
775#[derive(Debug, Clone, Serialize, Deserialize)]
777pub struct CommitInfo {
778 pub id: String,
779 pub message: String,
780 pub committer: String,
781 #[serde(default)]
782 pub metadata: HashMap<String, String>,
783 #[serde(default)]
785 pub creation_date: i64,
786}
787
788#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct DiffEntry {
791 pub path: String,
793 #[serde(rename = "type")]
795 pub diff_type: String,
796 #[serde(default)]
798 pub size_bytes: Option<u64>,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize)]
803pub struct ObjectStats {
804 pub path: String,
805 #[serde(default)]
806 pub physical_address: String,
807 pub size_bytes: u64,
808 #[serde(default)]
809 pub content_type: String,
810 #[serde(default)]
812 pub mtime: i64,
813 #[serde(default)]
814 pub checksum: String,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct LakeFSAction {
820 pub name: String,
821 pub on: Vec<String>,
823 pub hooks: Vec<ActionHook>,
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize)]
828pub struct ActionHook {
829 pub id: String,
830 #[serde(rename = "type")]
831 pub hook_type: String,
832 pub description: String,
833 #[serde(default)]
834 pub properties: HashMap<String, String>,
835}
836
837#[derive(Debug, Clone, Serialize, Deserialize)]
839pub struct RepositoryInfo {
840 pub id: String,
841 pub storage_namespace: String,
842 pub default_branch: String,
843 #[serde(default)]
844 pub creation_date: i64,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct BranchInfo {
850 pub id: String,
851 pub commit_id: String,
852}
853
854#[cfg(all(feature = "async", feature = "networking"))]
857#[derive(Clone)]
858pub struct LakeFSBackend {
859 client: Client,
860 endpoint: String,
861 repository: String, branch: String, #[allow(dead_code)]
864 access_key: String,
865 #[allow(dead_code)]
866 secret_key: String,
867 auth_provider: Arc<dyn AuthProvider>,
870 pending_writes: Arc<Mutex<Vec<Snapshot>>>,
871}
872
873#[cfg(all(feature = "async", feature = "networking"))]
874impl LakeFSBackend {
875 pub fn new(config: LakeFSConfig) -> Result<Self, StorageError> {
876 let endpoint = config.endpoint.trim_end_matches('/').to_string();
877
878 let auth = config.auth.clone().unwrap_or_else(|| LakeFSAuth::Basic {
880 access_key: config.access_key.clone(),
881 secret_key: config.secret_key.clone(),
882 });
883 let auth_provider = build_auth_provider(&auth, &endpoint)?;
884
885 let mut headers = HeaderMap::new();
887 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
888
889 let client = Client::builder()
890 .default_headers(headers)
891 .timeout(std::time::Duration::from_secs(30))
892 .build()
893 .map_err(|e| {
894 StorageError::ConnectionError(format!("Failed to create HTTP client: {}", e))
895 })?;
896
897 Ok(Self {
898 client,
899 endpoint,
900 repository: config.repository,
901 branch: config.branch,
902 access_key: config.access_key,
903 secret_key: config.secret_key,
904 auth_provider,
905 pending_writes: Arc::new(Mutex::new(Vec::new())),
906 })
907 }
908
909 async fn send_authed(&self, builder: RequestBuilder) -> Result<Response, StorageError> {
914 let auth_header = self.auth_provider.auth_header().await?;
915 builder
916 .header(AUTHORIZATION, auth_header)
917 .send()
918 .await
919 .map_err(|e| StorageError::ConnectionError(format!("Request failed: {}", e)))
920 }
921
922 pub fn auth_mode(&self) -> &'static str {
924 self.auth_provider.mode_name()
925 }
926
927 pub fn repository(&self) -> &str {
931 &self.repository
932 }
933
934 pub fn branch(&self) -> &str {
936 &self.branch
937 }
938
939 pub fn endpoint(&self) -> &str {
941 &self.endpoint
942 }
943
944 pub async fn create_commit(&self, message: &str) -> Result<String, StorageError> {
948 let url = format!(
949 "{}/api/v1/repositories/{}/branches/{}/commits",
950 self.endpoint, self.repository, self.branch
951 );
952
953 #[derive(Serialize)]
954 struct CommitRequest {
955 message: String,
956 metadata: HashMap<String, String>,
957 }
958
959 let mut metadata = HashMap::new();
960 metadata.insert("source".to_string(), "briefcase-ai".to_string());
961
962 let request = CommitRequest {
963 message: message.to_string(),
964 metadata,
965 };
966
967 let response = self
968 .send_authed(self.client.post(&url).json(&request))
969 .await?;
970
971 let status = response.status();
972 if !status.is_success() {
973 let error_text = response.text().await.unwrap_or_default();
974 return Err(StorageError::ConnectionError(format!(
975 "Commit failed with status {}: {}",
976 status, error_text
977 )));
978 }
979
980 #[derive(Deserialize)]
981 struct CommitResponse {
982 id: String,
983 }
984
985 let commit_response: CommitResponse = response.json().await.map_err(|e| {
986 StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
987 })?;
988
989 Ok(commit_response.id)
990 }
991
992 pub async fn upload_object(&self, path: &str, data: &[u8]) -> Result<(), StorageError> {
996 let staging_url = format!(
1006 "{}/api/v1/repositories/{}/branches/{}/staging/backing",
1007 self.endpoint, self.repository, self.branch
1008 );
1009
1010 let stage_resp = self
1011 .send_authed(
1012 self.client
1013 .get(&staging_url)
1014 .query(&[("path", path), ("presign", "true")]),
1015 )
1016 .await?;
1017
1018 if !stage_resp.status().is_success() {
1019 let status = stage_resp.status();
1020 let body = stage_resp.text().await.unwrap_or_default();
1021 return Err(StorageError::ConnectionError(format!(
1022 "Staging location request failed with status {}: {}",
1023 status, body
1024 )));
1025 }
1026
1027 let staging: serde_json::Value = stage_resp.json().await.map_err(|e| {
1028 StorageError::SerializationError(format!("Failed to parse staging response: {}", e))
1029 })?;
1030
1031 let physical_address = staging["physical_address"]
1032 .as_str()
1033 .ok_or_else(|| {
1034 StorageError::ConnectionError(format!(
1035 "Staging response missing physical_address. Response: {}",
1036 staging
1037 ))
1038 })?
1039 .to_string();
1040
1041 let presign_url = staging["presigned_url"]
1043 .as_str()
1044 .ok_or_else(|| {
1045 StorageError::ConnectionError(format!(
1046 "Staging response missing presigned_url. Response: {}",
1047 staging
1048 ))
1049 })?
1050 .to_string();
1051
1052 let s3_resp = self
1054 .client
1055 .put(&presign_url)
1056 .body(data.to_vec())
1057 .send()
1058 .await
1059 .map_err(|e| {
1060 StorageError::ConnectionError(format!("S3 presigned upload failed: {}", e))
1061 })?;
1062
1063 if !s3_resp.status().is_success() {
1064 let status = s3_resp.status();
1065 let body = s3_resp.text().await.unwrap_or_default();
1066 return Err(StorageError::ConnectionError(format!(
1067 "S3 presigned upload failed with status {}: {}",
1068 status, body
1069 )));
1070 }
1071
1072 let etag = s3_resp
1074 .headers()
1075 .get("etag")
1076 .and_then(|v| v.to_str().ok())
1077 .unwrap_or("")
1078 .trim_matches('"')
1079 .to_string();
1080
1081 let objects_url = format!(
1085 "{}/api/v1/repositories/{}/branches/{}/objects",
1086 self.endpoint, self.repository, self.branch
1087 );
1088
1089 let link_body = serde_json::json!({
1090 "physical_address": physical_address,
1091 "checksum": etag,
1092 "size_bytes": data.len() as i64,
1093 });
1094
1095 let link_resp = self
1096 .send_authed(
1097 self.client
1098 .put(&objects_url)
1099 .query(&[("path", path)])
1100 .json(&link_body),
1101 )
1102 .await?;
1103
1104 if !link_resp.status().is_success() {
1105 let status = link_resp.status();
1106 let body = link_resp.text().await.unwrap_or_default();
1107 return Err(StorageError::ConnectionError(format!(
1108 "Staging link failed with status {}: {}",
1109 status, body
1110 )));
1111 }
1112
1113 Ok(())
1114 }
1115
1116 pub async fn download_object(&self, path: &str) -> Result<Vec<u8>, StorageError> {
1118 let url = format!(
1119 "{}/api/v1/repositories/{}/refs/{}/objects",
1120 self.endpoint, self.repository, self.branch
1121 );
1122
1123 let response = self
1124 .send_authed(self.client.get(&url).query(&[("path", path)]))
1125 .await?;
1126
1127 if response.status() == reqwest::StatusCode::NOT_FOUND {
1128 return Err(StorageError::NotFound(format!(
1129 "Object not found: {}",
1130 path
1131 )));
1132 }
1133
1134 let status = response.status();
1135 if !status.is_success() {
1136 let error_text = response.text().await.unwrap_or_default();
1137 return Err(StorageError::ConnectionError(format!(
1138 "Download failed with status {}: {}",
1139 status, error_text
1140 )));
1141 }
1142
1143 let data = response.bytes().await.map_err(|e| {
1144 StorageError::ConnectionError(format!("Failed to read response: {}", e))
1145 })?;
1146
1147 Ok(data.to_vec())
1148 }
1149
1150 pub async fn list_objects(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
1152 let url = format!(
1153 "{}/api/v1/repositories/{}/refs/{}/objects/ls",
1154 self.endpoint, self.repository, self.branch
1155 );
1156
1157 let response = self
1158 .send_authed(self.client.get(&url).query(&[("prefix", prefix)]))
1159 .await?;
1160
1161 let status = response.status();
1162 if !status.is_success() {
1163 let error_text = response.text().await.unwrap_or_default();
1164 return Err(StorageError::ConnectionError(format!(
1165 "List failed with status {}: {}",
1166 status, error_text
1167 )));
1168 }
1169
1170 #[derive(Deserialize)]
1171 struct ListResponse {
1172 results: Vec<ObjectInfo>,
1173 }
1174
1175 #[derive(Deserialize)]
1176 struct ObjectInfo {
1177 path: String,
1178 #[serde(rename = "type", alias = "path_type")]
1180 object_type: String,
1181 }
1182
1183 let list_response: ListResponse = response.json().await.map_err(|e| {
1184 StorageError::SerializationError(format!("Failed to parse list response: {}", e))
1185 })?;
1186
1187 let paths = list_response
1188 .results
1189 .into_iter()
1190 .filter(|obj| obj.object_type == "object")
1191 .map(|obj| obj.path)
1192 .collect();
1193
1194 Ok(paths)
1195 }
1196
1197 pub async fn delete_object(&self, path: &str) -> Result<(), StorageError> {
1199 if path.is_empty() {
1200 return Err(StorageError::InvalidQuery(
1201 "Object path cannot be empty".to_string(),
1202 ));
1203 }
1204
1205 let url = format!(
1206 "{}/api/v1/repositories/{}/branches/{}/objects",
1207 self.endpoint, self.repository, self.branch
1208 );
1209
1210 let response = self
1211 .send_authed(self.client.delete(&url).query(&[("path", path)]))
1212 .await?;
1213
1214 if response.status() == reqwest::StatusCode::NOT_FOUND {
1215 return Err(StorageError::NotFound(format!(
1216 "Object not found: {}",
1217 path
1218 )));
1219 }
1220
1221 let status = response.status();
1222 if !status.is_success() {
1223 let error_text = response.text().await.unwrap_or_default();
1224 return Err(StorageError::ConnectionError(format!(
1225 "Delete failed with status {}: {}",
1226 status, error_text
1227 )));
1228 }
1229
1230 Ok(())
1231 }
1232
1233 pub async fn get_object_stats(&self, path: &str) -> Result<ObjectStats, StorageError> {
1235 if path.is_empty() {
1236 return Err(StorageError::InvalidQuery(
1237 "Object path cannot be empty".to_string(),
1238 ));
1239 }
1240
1241 let url = format!(
1242 "{}/api/v1/repositories/{}/refs/{}/objects/stat",
1243 self.endpoint, self.repository, self.branch
1244 );
1245
1246 let response = self
1247 .send_authed(self.client.get(&url).query(&[("path", path)]))
1248 .await?;
1249
1250 if response.status() == reqwest::StatusCode::NOT_FOUND {
1251 return Err(StorageError::NotFound(format!(
1252 "Object not found: {}",
1253 path
1254 )));
1255 }
1256
1257 let status = response.status();
1258 if !status.is_success() {
1259 let error_text = response.text().await.unwrap_or_default();
1260 return Err(StorageError::ConnectionError(format!(
1261 "Stats failed with status {}: {}",
1262 status, error_text
1263 )));
1264 }
1265
1266 let stats: ObjectStats = response.json().await.map_err(|e| {
1267 StorageError::SerializationError(format!("Failed to parse stats response: {}", e))
1268 })?;
1269
1270 Ok(stats)
1271 }
1272
1273 pub async fn create_repository(
1277 &self,
1278 name: &str,
1279 storage_namespace: &str,
1280 default_branch: Option<&str>,
1281 ) -> Result<RepositoryInfo, StorageError> {
1282 if name.is_empty() {
1283 return Err(StorageError::InvalidQuery(
1284 "Repository name cannot be empty".to_string(),
1285 ));
1286 }
1287 if storage_namespace.is_empty() {
1288 return Err(StorageError::InvalidQuery(
1289 "Storage namespace cannot be empty".to_string(),
1290 ));
1291 }
1292
1293 let url = format!("{}/api/v1/repositories", self.endpoint);
1294
1295 #[derive(Serialize)]
1296 struct CreateRepoRequest {
1297 name: String,
1298 storage_namespace: String,
1299 default_branch: String,
1300 }
1301
1302 let request = CreateRepoRequest {
1303 name: name.to_string(),
1304 storage_namespace: storage_namespace.to_string(),
1305 default_branch: default_branch.unwrap_or("main").to_string(),
1306 };
1307
1308 let response = self
1309 .send_authed(self.client.post(&url).json(&request))
1310 .await?;
1311
1312 if response.status() == reqwest::StatusCode::CONFLICT {
1313 return Err(StorageError::ConnectionError(format!(
1314 "Repository '{}' already exists",
1315 name
1316 )));
1317 }
1318
1319 let status = response.status();
1320 if !status.is_success() {
1321 let error_text = response.text().await.unwrap_or_default();
1322 return Err(StorageError::ConnectionError(format!(
1323 "Create repository failed with status {}: {}",
1324 status, error_text
1325 )));
1326 }
1327
1328 let info: RepositoryInfo = response.json().await.map_err(|e| {
1329 StorageError::SerializationError(format!("Failed to parse repository response: {}", e))
1330 })?;
1331
1332 Ok(info)
1333 }
1334
1335 pub async fn delete_repository(&self, name: &str) -> Result<(), StorageError> {
1337 if name.is_empty() {
1338 return Err(StorageError::InvalidQuery(
1339 "Repository name cannot be empty".to_string(),
1340 ));
1341 }
1342
1343 let url = format!("{}/api/v1/repositories/{}", self.endpoint, name);
1344
1345 let response = self.send_authed(self.client.delete(&url)).await?;
1346
1347 if response.status() == reqwest::StatusCode::NOT_FOUND {
1348 return Err(StorageError::NotFound(format!(
1349 "Repository not found: {}",
1350 name
1351 )));
1352 }
1353
1354 let status = response.status();
1355 if !status.is_success() {
1356 let error_text = response.text().await.unwrap_or_default();
1357 return Err(StorageError::ConnectionError(format!(
1358 "Delete repository failed with status {}: {}",
1359 status, error_text
1360 )));
1361 }
1362
1363 Ok(())
1364 }
1365
1366 pub async fn create_branch(
1370 &self,
1371 name: &str,
1372 source_branch: &str,
1373 ) -> Result<String, StorageError> {
1374 if name.is_empty() {
1375 return Err(StorageError::InvalidQuery(
1376 "Branch name cannot be empty".to_string(),
1377 ));
1378 }
1379 if source_branch.is_empty() {
1380 return Err(StorageError::InvalidQuery(
1381 "Source branch cannot be empty".to_string(),
1382 ));
1383 }
1384
1385 let url = format!(
1386 "{}/api/v1/repositories/{}/branches",
1387 self.endpoint, self.repository
1388 );
1389
1390 #[derive(Serialize)]
1391 struct CreateBranchRequest {
1392 name: String,
1393 source: String,
1394 }
1395
1396 let request = CreateBranchRequest {
1397 name: name.to_string(),
1398 source: source_branch.to_string(),
1399 };
1400
1401 let response = self
1402 .send_authed(self.client.post(&url).json(&request))
1403 .await?;
1404
1405 if response.status() == reqwest::StatusCode::CONFLICT {
1406 return Err(StorageError::ConnectionError(format!(
1407 "Branch '{}' already exists",
1408 name
1409 )));
1410 }
1411
1412 let status = response.status();
1413 if !status.is_success() {
1414 let error_text = response.text().await.unwrap_or_default();
1415 return Err(StorageError::ConnectionError(format!(
1416 "Create branch failed with status {}: {}",
1417 status, error_text
1418 )));
1419 }
1420
1421 let commit_id = response.text().await.map_err(|e| {
1423 StorageError::SerializationError(format!("Failed to read branch response: {}", e))
1424 })?;
1425
1426 let commit_id = commit_id.trim().trim_matches('"').to_string();
1428 Ok(commit_id)
1429 }
1430
1431 pub async fn delete_branch(&self, name: &str) -> Result<(), StorageError> {
1433 if name.is_empty() {
1434 return Err(StorageError::InvalidQuery(
1435 "Branch name cannot be empty".to_string(),
1436 ));
1437 }
1438
1439 let url = format!(
1440 "{}/api/v1/repositories/{}/branches/{}",
1441 self.endpoint, self.repository, name
1442 );
1443
1444 let response = self.send_authed(self.client.delete(&url)).await?;
1445
1446 if response.status() == reqwest::StatusCode::NOT_FOUND {
1447 return Err(StorageError::NotFound(format!(
1448 "Branch not found: {}",
1449 name
1450 )));
1451 }
1452
1453 let status = response.status();
1454 if !status.is_success() {
1455 let error_text = response.text().await.unwrap_or_default();
1456 return Err(StorageError::ConnectionError(format!(
1457 "Delete branch failed with status {}: {}",
1458 status, error_text
1459 )));
1460 }
1461
1462 Ok(())
1463 }
1464
1465 pub async fn merge_branch(
1467 &self,
1468 source_branch: &str,
1469 destination_branch: &str,
1470 message: Option<&str>,
1471 ) -> Result<MergeResult, StorageError> {
1472 if source_branch.is_empty() || destination_branch.is_empty() {
1473 return Err(StorageError::InvalidQuery(
1474 "Source and destination branches cannot be empty".to_string(),
1475 ));
1476 }
1477
1478 let url = format!(
1479 "{}/api/v1/repositories/{}/refs/{}/merge/{}",
1480 self.endpoint, self.repository, source_branch, destination_branch
1481 );
1482
1483 #[derive(Serialize)]
1484 struct MergeRequest {
1485 message: String,
1486 metadata: HashMap<String, String>,
1487 }
1488
1489 let mut metadata = HashMap::new();
1490 metadata.insert("source".to_string(), "briefcase-ai".to_string());
1491
1492 let request = MergeRequest {
1493 message: message
1494 .unwrap_or(&format!(
1495 "Merge {} into {}",
1496 source_branch, destination_branch
1497 ))
1498 .to_string(),
1499 metadata,
1500 };
1501
1502 let response = self
1503 .send_authed(self.client.post(&url).json(&request))
1504 .await?;
1505
1506 if response.status() == reqwest::StatusCode::CONFLICT {
1507 return Err(StorageError::ConnectionError(
1508 "Merge conflict detected".to_string(),
1509 ));
1510 }
1511
1512 let status = response.status();
1513 if !status.is_success() {
1514 let error_text = response.text().await.unwrap_or_default();
1515 return Err(StorageError::ConnectionError(format!(
1516 "Merge failed with status {}: {}",
1517 status, error_text
1518 )));
1519 }
1520
1521 #[derive(Deserialize)]
1523 struct MergeResponse {
1524 #[serde(default)]
1525 reference: String,
1526 #[serde(default)]
1527 summary: HashMap<String, serde_json::Value>,
1528 }
1529
1530 let merge_resp: MergeResponse = response.json().await.map_err(|e| {
1531 StorageError::SerializationError(format!("Failed to parse merge response: {}", e))
1532 })?;
1533
1534 Ok(MergeResult {
1535 commit_id: merge_resp.reference,
1536 summary: serde_json::to_string(&merge_resp.summary).unwrap_or_default(),
1537 })
1538 }
1539
1540 pub async fn get_commit(&self, commit_id: &str) -> Result<CommitInfo, StorageError> {
1544 if commit_id.is_empty() {
1545 return Err(StorageError::InvalidQuery(
1546 "Commit ID cannot be empty".to_string(),
1547 ));
1548 }
1549
1550 let url = format!(
1551 "{}/api/v1/repositories/{}/commits/{}",
1552 self.endpoint, self.repository, commit_id
1553 );
1554
1555 let response = self.send_authed(self.client.get(&url)).await?;
1556
1557 if response.status() == reqwest::StatusCode::NOT_FOUND {
1558 return Err(StorageError::NotFound(format!(
1559 "Commit not found: {}",
1560 commit_id
1561 )));
1562 }
1563
1564 let status = response.status();
1565 if !status.is_success() {
1566 let error_text = response.text().await.unwrap_or_default();
1567 return Err(StorageError::ConnectionError(format!(
1568 "Get commit failed with status {}: {}",
1569 status, error_text
1570 )));
1571 }
1572
1573 let info: CommitInfo = response.json().await.map_err(|e| {
1574 StorageError::SerializationError(format!("Failed to parse commit response: {}", e))
1575 })?;
1576
1577 Ok(info)
1578 }
1579
1580 pub async fn diff_refs(
1584 &self,
1585 left_ref: &str,
1586 right_ref: &str,
1587 ) -> Result<Vec<DiffEntry>, StorageError> {
1588 if left_ref.is_empty() || right_ref.is_empty() {
1589 return Err(StorageError::InvalidQuery(
1590 "Both refs must be non-empty".to_string(),
1591 ));
1592 }
1593
1594 let url = format!(
1595 "{}/api/v1/repositories/{}/refs/{}/diff/{}",
1596 self.endpoint, self.repository, left_ref, right_ref
1597 );
1598
1599 let response = self.send_authed(self.client.get(&url)).await?;
1600
1601 let status = response.status();
1602 if !status.is_success() {
1603 let error_text = response.text().await.unwrap_or_default();
1604 return Err(StorageError::ConnectionError(format!(
1605 "Diff failed with status {}: {}",
1606 status, error_text
1607 )));
1608 }
1609
1610 #[derive(Deserialize)]
1611 struct DiffResponse {
1612 results: Vec<DiffEntry>,
1613 }
1614
1615 let diff_resp: DiffResponse = response.json().await.map_err(|e| {
1616 StorageError::SerializationError(format!("Failed to parse diff response: {}", e))
1617 })?;
1618
1619 Ok(diff_resp.results)
1620 }
1621
1622 pub async fn register_action(&self, action: &LakeFSAction) -> Result<String, StorageError> {
1627 if action.name.is_empty() {
1628 return Err(StorageError::InvalidQuery(
1629 "Action name cannot be empty".to_string(),
1630 ));
1631 }
1632 if action.on.is_empty() {
1633 return Err(StorageError::InvalidQuery(
1634 "Action must have at least one event trigger".to_string(),
1635 ));
1636 }
1637 if action.hooks.is_empty() {
1638 return Err(StorageError::InvalidQuery(
1639 "Action must have at least one hook".to_string(),
1640 ));
1641 }
1642
1643 let yaml_content = self.build_action_yaml(action);
1645
1646 let path = format!("_lakefs_actions/{}.yaml", action.name);
1648 self.upload_object(&path, yaml_content.as_bytes()).await?;
1649
1650 let commit_msg = format!("Register LakeFS Action: {}", action.name);
1652 let commit_id = self.create_commit(&commit_msg).await?;
1653
1654 Ok(commit_id)
1655 }
1656
1657 fn build_action_yaml(&self, action: &LakeFSAction) -> String {
1659 let mut yaml = String::new();
1660 yaml.push_str(&format!("name: {}\n", action.name));
1661 yaml.push_str("on:\n");
1662 for event in &action.on {
1663 yaml.push_str(&format!(" {}:\n", event));
1664 }
1665 yaml.push_str("hooks:\n");
1666 for hook in &action.hooks {
1667 yaml.push_str(&format!(" - id: {}\n", hook.id));
1668 yaml.push_str(&format!(" type: {}\n", hook.hook_type));
1669 yaml.push_str(&format!(" description: {}\n", hook.description));
1670 if !hook.properties.is_empty() {
1671 yaml.push_str(" properties:\n");
1672 for (k, v) in &hook.properties {
1673 yaml.push_str(&format!(" {}: {}\n", k, v));
1674 }
1675 }
1676 }
1677 yaml
1678 }
1679
1680 fn snapshot_path(&self, snapshot_id: &str) -> String {
1684 format!("snapshots/{}.json", snapshot_id)
1685 }
1686
1687 fn decision_path(&self, decision_id: &str) -> String {
1689 format!("decisions/{}.json", decision_id)
1690 }
1691}
1692
1693#[cfg(all(feature = "async", feature = "networking"))]
1696#[async_trait::async_trait]
1697impl StorageBackend for LakeFSBackend {
1698 async fn save(&self, snapshot: &Snapshot) -> Result<String, StorageError> {
1699 let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1700
1701 {
1703 let mut pending = self.pending_writes.lock().unwrap();
1704 pending.push(snapshot.clone());
1705 }
1706
1707 Ok(snapshot_id)
1708 }
1709
1710 async fn save_decision(&self, decision: &DecisionSnapshot) -> Result<String, StorageError> {
1711 let decision_id = decision.metadata.snapshot_id.to_string();
1712 let path = self.decision_path(&decision_id);
1713
1714 let json_data = serde_json::to_vec(decision)
1715 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1716
1717 self.upload_object(&path, &json_data).await?;
1718
1719 Ok(decision_id)
1720 }
1721
1722 async fn load(&self, snapshot_id: &str) -> Result<Snapshot, StorageError> {
1723 let path = self.snapshot_path(snapshot_id);
1724 let data = self.download_object(&path).await?;
1725
1726 let snapshot: Snapshot = serde_json::from_slice(&data)
1727 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1728
1729 Ok(snapshot)
1730 }
1731
1732 async fn load_decision(&self, decision_id: &str) -> Result<DecisionSnapshot, StorageError> {
1733 let path = self.decision_path(decision_id);
1734 let data = self.download_object(&path).await?;
1735
1736 let decision: DecisionSnapshot = serde_json::from_slice(&data)
1737 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1738
1739 Ok(decision)
1740 }
1741
1742 async fn query(&self, query: SnapshotQuery) -> Result<Vec<Snapshot>, StorageError> {
1743 let paths = self.list_objects("snapshots/").await?;
1745
1746 let mut snapshots = Vec::new();
1747 let mut count = 0;
1748 let offset = query.offset.unwrap_or(0);
1749 let limit = query.limit.unwrap_or(usize::MAX);
1750
1751 for path in paths {
1752 if let Some(filename) = path.split('/').next_back() {
1753 if let Some(snapshot_id) = filename.strip_suffix(".json") {
1754 match self.load(snapshot_id).await {
1756 Ok(snapshot) => {
1757 if self.matches_query(&snapshot, &query) {
1759 if count >= offset {
1760 snapshots.push(snapshot);
1761 if snapshots.len() >= limit {
1762 break;
1763 }
1764 }
1765 count += 1;
1766 }
1767 }
1768 Err(_) => continue, }
1770 }
1771 }
1772 }
1773
1774 snapshots.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
1776
1777 Ok(snapshots)
1778 }
1779
1780 async fn delete(&self, snapshot_id: &str) -> Result<bool, StorageError> {
1781 let path = self.snapshot_path(snapshot_id);
1782 match self.get_object_stats(&path).await {
1786 Ok(_) => {
1787 self.delete_object(&path).await?;
1788 Ok(true)
1789 }
1790 Err(StorageError::NotFound(_)) => Ok(false),
1791 Err(e) => Err(e),
1792 }
1793 }
1794
1795 async fn flush(&self) -> Result<FlushResult, StorageError> {
1796 let pending_snapshots = {
1797 let mut pending = self.pending_writes.lock().unwrap();
1798 let snapshots = pending.clone();
1799 pending.clear();
1800 snapshots
1801 };
1802
1803 if pending_snapshots.is_empty() {
1804 return Ok(FlushResult {
1805 snapshots_written: 0,
1806 bytes_written: 0,
1807 checkpoint_id: None,
1808 });
1809 }
1810
1811 let mut bytes_written = 0;
1812
1813 for snapshot in &pending_snapshots {
1815 let snapshot_id = snapshot.metadata.snapshot_id.to_string();
1816 let path = self.snapshot_path(&snapshot_id);
1817
1818 let json_data = serde_json::to_vec(snapshot)
1819 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1820
1821 bytes_written += json_data.len();
1822
1823 self.upload_object(&path, &json_data).await?;
1824
1825 for decision in &snapshot.decisions {
1827 let decision_id = decision.metadata.snapshot_id.to_string();
1828 let decision_path = self.decision_path(&decision_id);
1829
1830 let decision_data = serde_json::to_vec(decision)
1831 .map_err(|e| StorageError::SerializationError(e.to_string()))?;
1832
1833 bytes_written += decision_data.len();
1834 self.upload_object(&decision_path, &decision_data).await?;
1835 }
1836 }
1837
1838 let commit_message = format!("Briefcase AI flush: {} snapshots", pending_snapshots.len());
1840 let commit_id = self.create_commit(&commit_message).await?;
1841
1842 Ok(FlushResult {
1843 snapshots_written: pending_snapshots.len(),
1844 bytes_written,
1845 checkpoint_id: Some(commit_id),
1846 })
1847 }
1848
1849 async fn health_check(&self) -> Result<bool, StorageError> {
1850 let url = format!("{}/api/v1/repositories/{}", self.endpoint, self.repository);
1851
1852 let response = self
1853 .send_authed(self.client.get(&url))
1854 .await
1855 .map_err(|e| StorageError::ConnectionError(format!("Health check failed: {}", e)))?;
1856
1857 Ok(response.status().is_success())
1858 }
1859}
1860
1861#[cfg(all(feature = "async", feature = "networking"))]
1862impl LakeFSBackend {
1863 fn matches_query(&self, snapshot: &Snapshot, query: &SnapshotQuery) -> bool {
1865 if let Some(start_time) = query.start_time {
1867 if snapshot.metadata.timestamp < start_time {
1868 return false;
1869 }
1870 }
1871
1872 if let Some(end_time) = query.end_time {
1873 if snapshot.metadata.timestamp > end_time {
1874 return false;
1875 }
1876 }
1877
1878 if query.function_name.is_some()
1880 || query.module_name.is_some()
1881 || query.model_name.is_some()
1882 || query.tags.is_some()
1883 {
1884 let mut found_match = false;
1885
1886 for decision in &snapshot.decisions {
1887 let mut decision_matches = true;
1888
1889 if let Some(function_name) = &query.function_name {
1890 if decision.function_name != *function_name {
1891 decision_matches = false;
1892 }
1893 }
1894
1895 if let Some(module_name) = &query.module_name {
1896 if decision.module_name.as_ref() != Some(module_name) {
1897 decision_matches = false;
1898 }
1899 }
1900
1901 if let Some(model_name) = &query.model_name {
1902 if let Some(model_params) = &decision.model_parameters {
1903 if model_params.model_name != *model_name {
1904 decision_matches = false;
1905 }
1906 } else {
1907 decision_matches = false;
1908 }
1909 }
1910
1911 if let Some(query_tags) = &query.tags {
1912 for (key, value) in query_tags {
1913 if decision.tags.get(key) != Some(value) {
1914 decision_matches = false;
1915 break;
1916 }
1917 }
1918 }
1919
1920 if decision_matches {
1921 found_match = true;
1922 break;
1923 }
1924 }
1925
1926 if !found_match {
1927 return false;
1928 }
1929 }
1930
1931 true
1932 }
1933}
1934
1935#[derive(Debug, Clone)]
1938pub struct LakeFSConfig {
1939 pub endpoint: String,
1940 pub repository: String,
1941 pub branch: String,
1942 pub access_key: String,
1943 pub secret_key: String,
1944 pub auth: Option<LakeFSAuth>,
1947}
1948
1949impl LakeFSConfig {
1950 pub fn new(
1951 endpoint: impl Into<String>,
1952 repository: impl Into<String>,
1953 branch: impl Into<String>,
1954 access_key: impl Into<String>,
1955 secret_key: impl Into<String>,
1956 ) -> Self {
1957 Self {
1958 endpoint: endpoint.into(),
1959 repository: repository.into(),
1960 branch: branch.into(),
1961 access_key: access_key.into(),
1962 secret_key: secret_key.into(),
1963 auth: None,
1964 }
1965 }
1966
1967 pub fn with_auth(mut self, auth: LakeFSAuth) -> Self {
1969 self.auth = Some(auth);
1970 self
1971 }
1972}
1973
1974#[cfg(test)]
1977mod tests {
1978 use super::*;
1979 use crate::models::*;
1980 use serde_json::json;
1981 use wiremock::matchers::{method, path, query_param};
1982 use wiremock::{Mock, MockServer, ResponseTemplate};
1983
1984 fn create_test_config_with_endpoint(endpoint: &str) -> LakeFSConfig {
1985 LakeFSConfig::new(
1986 endpoint,
1987 "briefcase-test",
1988 "main",
1989 "test_key",
1990 "test_secret",
1991 )
1992 }
1993
1994 fn create_test_config() -> LakeFSConfig {
1995 LakeFSConfig::new(
1996 "http://localhost:8000",
1997 "briefcase-test",
1998 "main",
1999 "test_access_key",
2000 "test_secret_key",
2001 )
2002 }
2003
2004 async fn create_test_snapshot() -> Snapshot {
2005 let input = Input::new("test_input", json!("value"), "string");
2006 let output = Output::new("test_output", json!("result"), "string");
2007 let model_params = ModelParameters::new("gpt-4");
2008
2009 let decision = DecisionSnapshot::new("test_function")
2010 .with_module("test_module")
2011 .add_input(input)
2012 .add_output(output)
2013 .with_model_parameters(model_params)
2014 .add_tag("env", "test");
2015
2016 let mut snapshot = Snapshot::new(SnapshotType::Session);
2017 snapshot.add_decision(decision);
2018 snapshot
2019 }
2020
2021 #[tokio::test]
2024 async fn test_lakefs_config_creation() {
2025 let config = create_test_config();
2026 assert_eq!(config.endpoint, "http://localhost:8000");
2027 assert_eq!(config.repository, "briefcase-test");
2028 assert_eq!(config.branch, "main");
2029 }
2030
2031 #[tokio::test]
2032 async fn test_object_paths() {
2033 let config = create_test_config();
2034 let backend = LakeFSBackend::new(config).unwrap();
2035
2036 let snapshot_id = "test-snapshot-123";
2037 let decision_id = "test-decision-456";
2038
2039 assert_eq!(
2040 backend.snapshot_path(snapshot_id),
2041 "snapshots/test-snapshot-123.json"
2042 );
2043 assert_eq!(
2044 backend.decision_path(decision_id),
2045 "decisions/test-decision-456.json"
2046 );
2047 }
2048
2049 #[tokio::test]
2050 async fn test_query_matching() {
2051 let config = create_test_config();
2052 let backend = LakeFSBackend::new(config).unwrap();
2053 let snapshot = create_test_snapshot().await;
2054
2055 let query = SnapshotQuery::new().with_function_name("test_function");
2057 assert!(backend.matches_query(&snapshot, &query));
2058
2059 let query = SnapshotQuery::new().with_function_name("other_function");
2060 assert!(!backend.matches_query(&snapshot, &query));
2061
2062 let query = SnapshotQuery::new().with_tag("env", "test");
2064 assert!(backend.matches_query(&snapshot, &query));
2065
2066 let query = SnapshotQuery::new().with_tag("env", "prod");
2067 assert!(!backend.matches_query(&snapshot, &query));
2068
2069 let query = SnapshotQuery::new().with_model_name("gpt-4");
2071 assert!(backend.matches_query(&snapshot, &query));
2072
2073 let query = SnapshotQuery::new().with_model_name("claude-3");
2074 assert!(!backend.matches_query(&snapshot, &query));
2075 }
2076
2077 #[tokio::test]
2078 async fn test_pending_writes() {
2079 let config = create_test_config();
2080 let backend = LakeFSBackend::new(config).unwrap();
2081 let snapshot = create_test_snapshot().await;
2082
2083 let snapshot_id = backend.save(&snapshot).await.unwrap();
2085 assert_eq!(snapshot_id, snapshot.metadata.snapshot_id.to_string());
2086
2087 {
2089 let pending = backend.pending_writes.lock().unwrap();
2090 assert_eq!(pending.len(), 1);
2091 }
2092 }
2093
2094 #[tokio::test]
2095 async fn test_accessors() {
2096 let config = create_test_config();
2097 let backend = LakeFSBackend::new(config).unwrap();
2098
2099 assert_eq!(backend.repository(), "briefcase-test");
2100 assert_eq!(backend.branch(), "main");
2101 assert_eq!(backend.endpoint(), "http://localhost:8000");
2102 }
2103
2104 #[tokio::test]
2107 async fn test_create_repository_empty_name() {
2108 let config = create_test_config();
2109 let backend = LakeFSBackend::new(config).unwrap();
2110
2111 let result = backend.create_repository("", "s3://bucket", None).await;
2112 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2113 }
2114
2115 #[tokio::test]
2116 async fn test_create_repository_empty_namespace() {
2117 let config = create_test_config();
2118 let backend = LakeFSBackend::new(config).unwrap();
2119
2120 let result = backend.create_repository("my-repo", "", None).await;
2121 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2122 }
2123
2124 #[tokio::test]
2125 async fn test_delete_repository_empty_name() {
2126 let config = create_test_config();
2127 let backend = LakeFSBackend::new(config).unwrap();
2128
2129 let result = backend.delete_repository("").await;
2130 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2131 }
2132
2133 #[tokio::test]
2134 async fn test_create_branch_empty_name() {
2135 let config = create_test_config();
2136 let backend = LakeFSBackend::new(config).unwrap();
2137
2138 let result = backend.create_branch("", "main").await;
2139 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2140 }
2141
2142 #[tokio::test]
2143 async fn test_create_branch_empty_source() {
2144 let config = create_test_config();
2145 let backend = LakeFSBackend::new(config).unwrap();
2146
2147 let result = backend.create_branch("feature", "").await;
2148 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2149 }
2150
2151 #[tokio::test]
2152 async fn test_delete_branch_empty_name() {
2153 let config = create_test_config();
2154 let backend = LakeFSBackend::new(config).unwrap();
2155
2156 let result = backend.delete_branch("").await;
2157 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2158 }
2159
2160 #[tokio::test]
2161 async fn test_merge_branch_empty_source() {
2162 let config = create_test_config();
2163 let backend = LakeFSBackend::new(config).unwrap();
2164
2165 let result = backend.merge_branch("", "main", None).await;
2166 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2167 }
2168
2169 #[tokio::test]
2170 async fn test_merge_branch_empty_destination() {
2171 let config = create_test_config();
2172 let backend = LakeFSBackend::new(config).unwrap();
2173
2174 let result = backend.merge_branch("feature", "", None).await;
2175 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2176 }
2177
2178 #[tokio::test]
2179 async fn test_get_commit_empty_id() {
2180 let config = create_test_config();
2181 let backend = LakeFSBackend::new(config).unwrap();
2182
2183 let result = backend.get_commit("").await;
2184 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2185 }
2186
2187 #[tokio::test]
2188 async fn test_diff_refs_empty_left() {
2189 let config = create_test_config();
2190 let backend = LakeFSBackend::new(config).unwrap();
2191
2192 let result = backend.diff_refs("", "main").await;
2193 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2194 }
2195
2196 #[tokio::test]
2197 async fn test_diff_refs_empty_right() {
2198 let config = create_test_config();
2199 let backend = LakeFSBackend::new(config).unwrap();
2200
2201 let result = backend.diff_refs("feature", "").await;
2202 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2203 }
2204
2205 #[tokio::test]
2206 async fn test_delete_object_empty_path() {
2207 let config = create_test_config();
2208 let backend = LakeFSBackend::new(config).unwrap();
2209
2210 let result = backend.delete_object("").await;
2211 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2212 }
2213
2214 #[tokio::test]
2215 async fn test_get_object_stats_empty_path() {
2216 let config = create_test_config();
2217 let backend = LakeFSBackend::new(config).unwrap();
2218
2219 let result = backend.get_object_stats("").await;
2220 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2221 }
2222
2223 #[tokio::test]
2224 async fn test_register_action_empty_name() {
2225 let config = create_test_config();
2226 let backend = LakeFSBackend::new(config).unwrap();
2227
2228 let action = LakeFSAction {
2229 name: "".to_string(),
2230 on: vec!["post-commit".to_string()],
2231 hooks: vec![ActionHook {
2232 id: "hook1".to_string(),
2233 hook_type: "webhook".to_string(),
2234 description: "test".to_string(),
2235 properties: HashMap::new(),
2236 }],
2237 };
2238
2239 let result = backend.register_action(&action).await;
2240 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2241 }
2242
2243 #[tokio::test]
2244 async fn test_register_action_no_events() {
2245 let config = create_test_config();
2246 let backend = LakeFSBackend::new(config).unwrap();
2247
2248 let action = LakeFSAction {
2249 name: "test-action".to_string(),
2250 on: vec![],
2251 hooks: vec![ActionHook {
2252 id: "hook1".to_string(),
2253 hook_type: "webhook".to_string(),
2254 description: "test".to_string(),
2255 properties: HashMap::new(),
2256 }],
2257 };
2258
2259 let result = backend.register_action(&action).await;
2260 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2261 }
2262
2263 #[tokio::test]
2264 async fn test_register_action_no_hooks() {
2265 let config = create_test_config();
2266 let backend = LakeFSBackend::new(config).unwrap();
2267
2268 let action = LakeFSAction {
2269 name: "test-action".to_string(),
2270 on: vec!["post-commit".to_string()],
2271 hooks: vec![],
2272 };
2273
2274 let result = backend.register_action(&action).await;
2275 assert!(matches!(result, Err(StorageError::InvalidQuery(_))));
2276 }
2277
2278 #[tokio::test]
2281 async fn test_build_action_yaml() {
2282 let config = create_test_config();
2283 let backend = LakeFSBackend::new(config).unwrap();
2284
2285 let mut props = HashMap::new();
2286 props.insert("url".to_string(), "http://localhost:9000/hook".to_string());
2287
2288 let action = LakeFSAction {
2289 name: "briefcase-post-commit".to_string(),
2290 on: vec!["post-commit".to_string()],
2291 hooks: vec![ActionHook {
2292 id: "notify".to_string(),
2293 hook_type: "webhook".to_string(),
2294 description: "Notify Briefcase server".to_string(),
2295 properties: props,
2296 }],
2297 };
2298
2299 let yaml = backend.build_action_yaml(&action);
2300 assert!(yaml.contains("name: briefcase-post-commit"));
2301 assert!(yaml.contains("post-commit:"));
2302 assert!(yaml.contains("id: notify"));
2303 assert!(yaml.contains("type: webhook"));
2304 assert!(yaml.contains("url: http://localhost:9000/hook"));
2305 }
2306
2307 #[tokio::test]
2308 async fn test_build_action_yaml_multiple_events() {
2309 let config = create_test_config();
2310 let backend = LakeFSBackend::new(config).unwrap();
2311
2312 let action = LakeFSAction {
2313 name: "multi-event".to_string(),
2314 on: vec!["pre-commit".to_string(), "post-merge".to_string()],
2315 hooks: vec![ActionHook {
2316 id: "h1".to_string(),
2317 hook_type: "webhook".to_string(),
2318 description: "Hook".to_string(),
2319 properties: HashMap::new(),
2320 }],
2321 };
2322
2323 let yaml = backend.build_action_yaml(&action);
2324 assert!(yaml.contains("pre-commit:"));
2325 assert!(yaml.contains("post-merge:"));
2326 }
2327
2328 #[tokio::test]
2331 async fn test_flush_empty_pending() {
2332 let config = create_test_config();
2333 let backend = LakeFSBackend::new(config).unwrap();
2334
2335 let result = backend.flush().await.unwrap();
2336 assert_eq!(result.snapshots_written, 0);
2337 assert_eq!(result.bytes_written, 0);
2338 assert!(result.checkpoint_id.is_none());
2339 }
2340
2341 #[tokio::test]
2344 async fn test_create_repository_success() {
2345 let mock_server = MockServer::start().await;
2346 let config = create_test_config_with_endpoint(&mock_server.uri());
2347 let backend = LakeFSBackend::new(config).unwrap();
2348
2349 Mock::given(method("POST"))
2350 .and(path("/api/v1/repositories"))
2351 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2352 "id": "my-engagement",
2353 "storage_namespace": "s3://my-bucket/engagement",
2354 "default_branch": "main",
2355 "creation_date": 1700000000
2356 })))
2357 .expect(1)
2358 .mount(&mock_server)
2359 .await;
2360
2361 let result = backend
2362 .create_repository("my-engagement", "s3://my-bucket/engagement", None)
2363 .await
2364 .unwrap();
2365
2366 assert_eq!(result.id, "my-engagement");
2367 assert_eq!(result.storage_namespace, "s3://my-bucket/engagement");
2368 assert_eq!(result.default_branch, "main");
2369 }
2370
2371 #[tokio::test]
2372 async fn test_create_repository_conflict() {
2373 let mock_server = MockServer::start().await;
2374 let config = create_test_config_with_endpoint(&mock_server.uri());
2375 let backend = LakeFSBackend::new(config).unwrap();
2376
2377 Mock::given(method("POST"))
2378 .and(path("/api/v1/repositories"))
2379 .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2380 "message": "repository already exists"
2381 })))
2382 .expect(1)
2383 .mount(&mock_server)
2384 .await;
2385
2386 let result = backend
2387 .create_repository("existing-repo", "s3://bucket", None)
2388 .await;
2389
2390 assert!(result.is_err());
2391 let err = result.unwrap_err();
2392 assert!(
2393 matches!(err, StorageError::ConnectionError(msg) if msg.contains("already exists"))
2394 );
2395 }
2396
2397 #[tokio::test]
2398 async fn test_create_repository_custom_branch() {
2399 let mock_server = MockServer::start().await;
2400 let config = create_test_config_with_endpoint(&mock_server.uri());
2401 let backend = LakeFSBackend::new(config).unwrap();
2402
2403 Mock::given(method("POST"))
2404 .and(path("/api/v1/repositories"))
2405 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2406 "id": "my-repo",
2407 "storage_namespace": "s3://bucket",
2408 "default_branch": "develop",
2409 "creation_date": 1700000000
2410 })))
2411 .expect(1)
2412 .mount(&mock_server)
2413 .await;
2414
2415 let result = backend
2416 .create_repository("my-repo", "s3://bucket", Some("develop"))
2417 .await
2418 .unwrap();
2419
2420 assert_eq!(result.default_branch, "develop");
2421 }
2422
2423 #[tokio::test]
2424 async fn test_delete_repository_success() {
2425 let mock_server = MockServer::start().await;
2426 let config = create_test_config_with_endpoint(&mock_server.uri());
2427 let backend = LakeFSBackend::new(config).unwrap();
2428
2429 Mock::given(method("DELETE"))
2430 .and(path("/api/v1/repositories/test-repo"))
2431 .respond_with(ResponseTemplate::new(204))
2432 .expect(1)
2433 .mount(&mock_server)
2434 .await;
2435
2436 backend.delete_repository("test-repo").await.unwrap();
2437 }
2438
2439 #[tokio::test]
2440 async fn test_delete_repository_not_found() {
2441 let mock_server = MockServer::start().await;
2442 let config = create_test_config_with_endpoint(&mock_server.uri());
2443 let backend = LakeFSBackend::new(config).unwrap();
2444
2445 Mock::given(method("DELETE"))
2446 .and(path("/api/v1/repositories/nonexistent"))
2447 .respond_with(ResponseTemplate::new(404))
2448 .expect(1)
2449 .mount(&mock_server)
2450 .await;
2451
2452 let result = backend.delete_repository("nonexistent").await;
2453 assert!(matches!(result, Err(StorageError::NotFound(_))));
2454 }
2455
2456 #[tokio::test]
2457 async fn test_create_branch_success() {
2458 let mock_server = MockServer::start().await;
2459 let config = create_test_config_with_endpoint(&mock_server.uri());
2460 let backend = LakeFSBackend::new(config).unwrap();
2461
2462 Mock::given(method("POST"))
2463 .and(path("/api/v1/repositories/briefcase-test/branches"))
2464 .respond_with(ResponseTemplate::new(201).set_body_string("\"abc123def456\""))
2465 .expect(1)
2466 .mount(&mock_server)
2467 .await;
2468
2469 let commit_id = backend.create_branch("feature-x", "main").await.unwrap();
2470 assert_eq!(commit_id, "abc123def456");
2471 }
2472
2473 #[tokio::test]
2474 async fn test_create_branch_conflict() {
2475 let mock_server = MockServer::start().await;
2476 let config = create_test_config_with_endpoint(&mock_server.uri());
2477 let backend = LakeFSBackend::new(config).unwrap();
2478
2479 Mock::given(method("POST"))
2480 .and(path("/api/v1/repositories/briefcase-test/branches"))
2481 .respond_with(ResponseTemplate::new(409))
2482 .expect(1)
2483 .mount(&mock_server)
2484 .await;
2485
2486 let result = backend.create_branch("existing-branch", "main").await;
2487 assert!(result.is_err());
2488 }
2489
2490 #[tokio::test]
2491 async fn test_delete_branch_success() {
2492 let mock_server = MockServer::start().await;
2493 let config = create_test_config_with_endpoint(&mock_server.uri());
2494 let backend = LakeFSBackend::new(config).unwrap();
2495
2496 Mock::given(method("DELETE"))
2497 .and(path(
2498 "/api/v1/repositories/briefcase-test/branches/feature-x",
2499 ))
2500 .respond_with(ResponseTemplate::new(204))
2501 .expect(1)
2502 .mount(&mock_server)
2503 .await;
2504
2505 backend.delete_branch("feature-x").await.unwrap();
2506 }
2507
2508 #[tokio::test]
2509 async fn test_delete_branch_not_found() {
2510 let mock_server = MockServer::start().await;
2511 let config = create_test_config_with_endpoint(&mock_server.uri());
2512 let backend = LakeFSBackend::new(config).unwrap();
2513
2514 Mock::given(method("DELETE"))
2515 .and(path(
2516 "/api/v1/repositories/briefcase-test/branches/nonexistent",
2517 ))
2518 .respond_with(ResponseTemplate::new(404))
2519 .expect(1)
2520 .mount(&mock_server)
2521 .await;
2522
2523 let result = backend.delete_branch("nonexistent").await;
2524 assert!(matches!(result, Err(StorageError::NotFound(_))));
2525 }
2526
2527 #[tokio::test]
2528 async fn test_merge_branch_success() {
2529 let mock_server = MockServer::start().await;
2530 let config = create_test_config_with_endpoint(&mock_server.uri());
2531 let backend = LakeFSBackend::new(config).unwrap();
2532
2533 Mock::given(method("POST"))
2534 .and(path(
2535 "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2536 ))
2537 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2538 "reference": "merge-commit-abc123",
2539 "summary": {"added": 3, "removed": 0, "changed": 1}
2540 })))
2541 .expect(1)
2542 .mount(&mock_server)
2543 .await;
2544
2545 let result = backend
2546 .merge_branch("feature-x", "main", Some("Merge feature-x"))
2547 .await
2548 .unwrap();
2549
2550 assert_eq!(result.commit_id, "merge-commit-abc123");
2551 assert!(result.summary.contains("added"));
2552 }
2553
2554 #[tokio::test]
2555 async fn test_merge_branch_conflict() {
2556 let mock_server = MockServer::start().await;
2557 let config = create_test_config_with_endpoint(&mock_server.uri());
2558 let backend = LakeFSBackend::new(config).unwrap();
2559
2560 Mock::given(method("POST"))
2561 .and(path(
2562 "/api/v1/repositories/briefcase-test/refs/feature-x/merge/main",
2563 ))
2564 .respond_with(ResponseTemplate::new(409).set_body_json(json!({
2565 "message": "conflict"
2566 })))
2567 .expect(1)
2568 .mount(&mock_server)
2569 .await;
2570
2571 let result = backend.merge_branch("feature-x", "main", None).await;
2572 assert!(result.is_err());
2573 let err = result.unwrap_err();
2574 assert!(matches!(err, StorageError::ConnectionError(msg) if msg.contains("conflict")));
2575 }
2576
2577 #[tokio::test]
2578 async fn test_merge_branch_default_message() {
2579 let mock_server = MockServer::start().await;
2580 let config = create_test_config_with_endpoint(&mock_server.uri());
2581 let backend = LakeFSBackend::new(config).unwrap();
2582
2583 Mock::given(method("POST"))
2584 .and(path(
2585 "/api/v1/repositories/briefcase-test/refs/src/merge/dst",
2586 ))
2587 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2588 "reference": "commit-xyz",
2589 "summary": {}
2590 })))
2591 .expect(1)
2592 .mount(&mock_server)
2593 .await;
2594
2595 let result = backend.merge_branch("src", "dst", None).await.unwrap();
2596 assert_eq!(result.commit_id, "commit-xyz");
2597 }
2598
2599 #[tokio::test]
2600 async fn test_get_commit_success() {
2601 let mock_server = MockServer::start().await;
2602 let config = create_test_config_with_endpoint(&mock_server.uri());
2603 let backend = LakeFSBackend::new(config).unwrap();
2604
2605 Mock::given(method("GET"))
2606 .and(path("/api/v1/repositories/briefcase-test/commits/abc123"))
2607 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2608 "id": "abc123",
2609 "message": "Initial commit",
2610 "committer": "admin",
2611 "metadata": {"source": "briefcase-ai"},
2612 "creation_date": 1700000000
2613 })))
2614 .expect(1)
2615 .mount(&mock_server)
2616 .await;
2617
2618 let commit = backend.get_commit("abc123").await.unwrap();
2619 assert_eq!(commit.id, "abc123");
2620 assert_eq!(commit.message, "Initial commit");
2621 assert_eq!(commit.committer, "admin");
2622 assert_eq!(
2623 commit.metadata.get("source"),
2624 Some(&"briefcase-ai".to_string())
2625 );
2626 }
2627
2628 #[tokio::test]
2629 async fn test_get_commit_not_found() {
2630 let mock_server = MockServer::start().await;
2631 let config = create_test_config_with_endpoint(&mock_server.uri());
2632 let backend = LakeFSBackend::new(config).unwrap();
2633
2634 Mock::given(method("GET"))
2635 .and(path(
2636 "/api/v1/repositories/briefcase-test/commits/nonexistent",
2637 ))
2638 .respond_with(ResponseTemplate::new(404))
2639 .expect(1)
2640 .mount(&mock_server)
2641 .await;
2642
2643 let result = backend.get_commit("nonexistent").await;
2644 assert!(matches!(result, Err(StorageError::NotFound(_))));
2645 }
2646
2647 #[tokio::test]
2648 async fn test_diff_refs_success() {
2649 let mock_server = MockServer::start().await;
2650 let config = create_test_config_with_endpoint(&mock_server.uri());
2651 let backend = LakeFSBackend::new(config).unwrap();
2652
2653 Mock::given(method("GET"))
2654 .and(path(
2655 "/api/v1/repositories/briefcase-test/refs/main/diff/feature-x",
2656 ))
2657 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2658 "results": [
2659 {"path": "snapshots/a.json", "type": "added", "size_bytes": 1024},
2660 {"path": "snapshots/b.json", "type": "changed", "size_bytes": 2048},
2661 {"path": "decisions/old.json", "type": "removed"}
2662 ]
2663 })))
2664 .expect(1)
2665 .mount(&mock_server)
2666 .await;
2667
2668 let diffs = backend.diff_refs("main", "feature-x").await.unwrap();
2669 assert_eq!(diffs.len(), 3);
2670 assert_eq!(diffs[0].path, "snapshots/a.json");
2671 assert_eq!(diffs[0].diff_type, "added");
2672 assert_eq!(diffs[0].size_bytes, Some(1024));
2673 assert_eq!(diffs[1].diff_type, "changed");
2674 assert_eq!(diffs[2].diff_type, "removed");
2675 assert_eq!(diffs[2].size_bytes, None);
2676 }
2677
2678 #[tokio::test]
2679 async fn test_diff_refs_empty_result() {
2680 let mock_server = MockServer::start().await;
2681 let config = create_test_config_with_endpoint(&mock_server.uri());
2682 let backend = LakeFSBackend::new(config).unwrap();
2683
2684 Mock::given(method("GET"))
2685 .and(path(
2686 "/api/v1/repositories/briefcase-test/refs/main/diff/main",
2687 ))
2688 .respond_with(ResponseTemplate::new(200).set_body_json(json!({"results": []})))
2689 .expect(1)
2690 .mount(&mock_server)
2691 .await;
2692
2693 let diffs = backend.diff_refs("main", "main").await.unwrap();
2694 assert!(diffs.is_empty());
2695 }
2696
2697 #[tokio::test]
2698 async fn test_get_object_stats_success() {
2699 let mock_server = MockServer::start().await;
2700 let config = create_test_config_with_endpoint(&mock_server.uri());
2701 let backend = LakeFSBackend::new(config).unwrap();
2702
2703 Mock::given(method("GET"))
2704 .and(path(
2705 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2706 ))
2707 .and(query_param("path", "snapshots/test.json"))
2708 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2709 "path": "snapshots/test.json",
2710 "physical_address": "s3://bucket/data/abc",
2711 "size_bytes": 4096,
2712 "content_type": "application/json",
2713 "mtime": 1700000000,
2714 "checksum": "sha256:abcdef"
2715 })))
2716 .expect(1)
2717 .mount(&mock_server)
2718 .await;
2719
2720 let stats = backend
2721 .get_object_stats("snapshots/test.json")
2722 .await
2723 .unwrap();
2724
2725 assert_eq!(stats.path, "snapshots/test.json");
2726 assert_eq!(stats.size_bytes, 4096);
2727 assert_eq!(stats.content_type, "application/json");
2728 assert_eq!(stats.checksum, "sha256:abcdef");
2729 }
2730
2731 #[tokio::test]
2732 async fn test_get_object_stats_not_found() {
2733 let mock_server = MockServer::start().await;
2734 let config = create_test_config_with_endpoint(&mock_server.uri());
2735 let backend = LakeFSBackend::new(config).unwrap();
2736
2737 Mock::given(method("GET"))
2738 .and(path(
2739 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2740 ))
2741 .respond_with(ResponseTemplate::new(404))
2742 .expect(1)
2743 .mount(&mock_server)
2744 .await;
2745
2746 let result = backend.get_object_stats("nonexistent.json").await;
2747 assert!(matches!(result, Err(StorageError::NotFound(_))));
2748 }
2749
2750 #[tokio::test]
2751 async fn test_delete_object_success() {
2752 let mock_server = MockServer::start().await;
2753 let config = create_test_config_with_endpoint(&mock_server.uri());
2754 let backend = LakeFSBackend::new(config).unwrap();
2755
2756 Mock::given(method("DELETE"))
2757 .and(path(
2758 "/api/v1/repositories/briefcase-test/branches/main/objects",
2759 ))
2760 .and(query_param("path", "snapshots/old.json"))
2761 .respond_with(ResponseTemplate::new(204))
2762 .expect(1)
2763 .mount(&mock_server)
2764 .await;
2765
2766 backend.delete_object("snapshots/old.json").await.unwrap();
2767 }
2768
2769 #[tokio::test]
2770 async fn test_delete_object_not_found() {
2771 let mock_server = MockServer::start().await;
2772 let config = create_test_config_with_endpoint(&mock_server.uri());
2773 let backend = LakeFSBackend::new(config).unwrap();
2774
2775 Mock::given(method("DELETE"))
2776 .and(path(
2777 "/api/v1/repositories/briefcase-test/branches/main/objects",
2778 ))
2779 .respond_with(ResponseTemplate::new(404))
2780 .expect(1)
2781 .mount(&mock_server)
2782 .await;
2783
2784 let result = backend.delete_object("nonexistent.json").await;
2785 assert!(matches!(result, Err(StorageError::NotFound(_))));
2786 }
2787
2788 #[tokio::test]
2789 async fn test_upload_object_success() {
2790 let mock_server = MockServer::start().await;
2795 let config = create_test_config_with_endpoint(&mock_server.uri());
2796 let backend = LakeFSBackend::new(config).unwrap();
2797
2798 let presigned_s3_url = format!("{}/fake-s3/test-file", mock_server.uri());
2801 Mock::given(method("GET"))
2802 .and(path(
2803 "/api/v1/repositories/briefcase-test/branches/main/staging/backing",
2804 ))
2805 .and(query_param("path", "test/file.json"))
2806 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
2807 "physical_address": "s3://briefcase-test-bucket/test-file",
2808 "presigned_url": presigned_s3_url,
2809 "presigned_url_expiry": 9999999999i64
2810 })))
2811 .expect(1)
2812 .mount(&mock_server)
2813 .await;
2814
2815 Mock::given(method("PUT"))
2817 .and(path("/fake-s3/test-file"))
2818 .respond_with(ResponseTemplate::new(200).append_header("etag", "\"abc123def456\""))
2819 .expect(1)
2820 .mount(&mock_server)
2821 .await;
2822
2823 Mock::given(method("PUT"))
2825 .and(path(
2826 "/api/v1/repositories/briefcase-test/branches/main/objects",
2827 ))
2828 .and(query_param("path", "test/file.json"))
2829 .respond_with(ResponseTemplate::new(200))
2830 .expect(1)
2831 .mount(&mock_server)
2832 .await;
2833
2834 backend
2835 .upload_object("test/file.json", b"hello world")
2836 .await
2837 .unwrap();
2838 }
2839
2840 #[tokio::test]
2841 async fn test_upload_object_server_error() {
2842 let mock_server = MockServer::start().await;
2843 let config = create_test_config_with_endpoint(&mock_server.uri());
2844 let backend = LakeFSBackend::new(config).unwrap();
2845
2846 Mock::given(method("GET"))
2848 .and(path(
2849 "/api/v1/repositories/briefcase-test/branches/main/staging/backing",
2850 ))
2851 .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
2852 .expect(1)
2853 .mount(&mock_server)
2854 .await;
2855
2856 let result = backend.upload_object("test/file.json", b"data").await;
2857 assert!(result.is_err());
2858 }
2859
2860 #[tokio::test]
2861 async fn test_download_object_success() {
2862 let mock_server = MockServer::start().await;
2863 let config = create_test_config_with_endpoint(&mock_server.uri());
2864 let backend = LakeFSBackend::new(config).unwrap();
2865
2866 Mock::given(method("GET"))
2867 .and(path(
2868 "/api/v1/repositories/briefcase-test/refs/main/objects",
2869 ))
2870 .and(query_param("path", "test/file.json"))
2871 .respond_with(ResponseTemplate::new(200).set_body_bytes(b"file content here".to_vec()))
2872 .expect(1)
2873 .mount(&mock_server)
2874 .await;
2875
2876 let data = backend.download_object("test/file.json").await.unwrap();
2877 assert_eq!(data, b"file content here");
2878 }
2879
2880 #[tokio::test]
2881 async fn test_download_object_not_found() {
2882 let mock_server = MockServer::start().await;
2883 let config = create_test_config_with_endpoint(&mock_server.uri());
2884 let backend = LakeFSBackend::new(config).unwrap();
2885
2886 Mock::given(method("GET"))
2887 .and(path(
2888 "/api/v1/repositories/briefcase-test/refs/main/objects",
2889 ))
2890 .respond_with(ResponseTemplate::new(404))
2891 .expect(1)
2892 .mount(&mock_server)
2893 .await;
2894
2895 let result = backend.download_object("missing.json").await;
2896 assert!(matches!(result, Err(StorageError::NotFound(_))));
2897 }
2898
2899 #[tokio::test]
2900 async fn test_create_commit_success() {
2901 let mock_server = MockServer::start().await;
2902 let config = create_test_config_with_endpoint(&mock_server.uri());
2903 let backend = LakeFSBackend::new(config).unwrap();
2904
2905 Mock::given(method("POST"))
2906 .and(path(
2907 "/api/v1/repositories/briefcase-test/branches/main/commits",
2908 ))
2909 .respond_with(ResponseTemplate::new(201).set_body_json(json!({
2910 "id": "commit-sha-xyz"
2911 })))
2912 .expect(1)
2913 .mount(&mock_server)
2914 .await;
2915
2916 let id = backend.create_commit("Test commit").await.unwrap();
2917 assert_eq!(id, "commit-sha-xyz");
2918 }
2919
2920 #[tokio::test]
2921 async fn test_list_objects_success() {
2922 let mock_server = MockServer::start().await;
2923 let config = create_test_config_with_endpoint(&mock_server.uri());
2924 let backend = LakeFSBackend::new(config).unwrap();
2925
2926 Mock::given(method("GET"))
2927 .and(path(
2928 "/api/v1/repositories/briefcase-test/refs/main/objects/ls",
2929 ))
2930 .and(query_param("prefix", "snapshots/"))
2931 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2932 "results": [
2933 {"path": "snapshots/a.json", "type": "object"},
2934 {"path": "snapshots/b.json", "type": "object"},
2935 {"path": "snapshots/", "type": "common_prefix"}
2936 ]
2937 })))
2938 .expect(1)
2939 .mount(&mock_server)
2940 .await;
2941
2942 let paths = backend.list_objects("snapshots/").await.unwrap();
2943 assert_eq!(paths.len(), 2);
2944 assert!(paths.contains(&"snapshots/a.json".to_string()));
2945 assert!(paths.contains(&"snapshots/b.json".to_string()));
2946 }
2947
2948 #[tokio::test]
2949 async fn test_health_check_healthy() {
2950 let mock_server = MockServer::start().await;
2951 let config = create_test_config_with_endpoint(&mock_server.uri());
2952 let backend = LakeFSBackend::new(config).unwrap();
2953
2954 Mock::given(method("GET"))
2955 .and(path("/api/v1/repositories/briefcase-test"))
2956 .respond_with(ResponseTemplate::new(200))
2957 .expect(1)
2958 .mount(&mock_server)
2959 .await;
2960
2961 assert!(backend.health_check().await.unwrap());
2962 }
2963
2964 #[tokio::test]
2965 async fn test_health_check_unhealthy() {
2966 let mock_server = MockServer::start().await;
2967 let config = create_test_config_with_endpoint(&mock_server.uri());
2968 let backend = LakeFSBackend::new(config).unwrap();
2969
2970 Mock::given(method("GET"))
2971 .and(path("/api/v1/repositories/briefcase-test"))
2972 .respond_with(ResponseTemplate::new(503))
2973 .expect(1)
2974 .mount(&mock_server)
2975 .await;
2976
2977 assert!(!backend.health_check().await.unwrap());
2978 }
2979
2980 #[tokio::test]
2981 async fn test_delete_via_trait_success() {
2982 let mock_server = MockServer::start().await;
2985 let config = create_test_config_with_endpoint(&mock_server.uri());
2986 let backend = LakeFSBackend::new(config).unwrap();
2987
2988 Mock::given(method("GET"))
2990 .and(path(
2991 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
2992 ))
2993 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
2994 "path": "snapshots/some-snapshot-id.json",
2995 "physical_address": "s3://bucket/some-snapshot-id",
2996 "size_bytes": 512,
2997 "content_type": "application/json",
2998 "mtime": 1700000000,
2999 "checksum": "abc123"
3000 })))
3001 .expect(1)
3002 .mount(&mock_server)
3003 .await;
3004
3005 Mock::given(method("DELETE"))
3007 .and(path(
3008 "/api/v1/repositories/briefcase-test/branches/main/objects",
3009 ))
3010 .respond_with(ResponseTemplate::new(204))
3011 .expect(1)
3012 .mount(&mock_server)
3013 .await;
3014
3015 let deleted = backend.delete("some-snapshot-id").await.unwrap();
3016 assert!(deleted);
3017 }
3018
3019 #[tokio::test]
3020 async fn test_delete_via_trait_not_found() {
3021 let mock_server = MockServer::start().await;
3022 let config = create_test_config_with_endpoint(&mock_server.uri());
3023 let backend = LakeFSBackend::new(config).unwrap();
3024
3025 Mock::given(method("GET"))
3027 .and(path(
3028 "/api/v1/repositories/briefcase-test/refs/main/objects/stat",
3029 ))
3030 .respond_with(ResponseTemplate::new(404))
3031 .expect(1)
3032 .mount(&mock_server)
3033 .await;
3034
3035 let deleted = backend.delete("nonexistent").await.unwrap();
3037 assert!(!deleted);
3038 }
3039
3040 #[test]
3046 fn test_lakefs_auth_enum_basic() {
3047 let auth = LakeFSAuth::Basic {
3048 access_key: "ak".into(),
3049 secret_key: "sk".into(),
3050 };
3051 match auth {
3052 LakeFSAuth::Basic {
3053 access_key,
3054 secret_key,
3055 } => {
3056 assert_eq!(access_key, "ak");
3057 assert_eq!(secret_key, "sk");
3058 }
3059 _ => panic!("Expected Basic"),
3060 }
3061 }
3062
3063 #[test]
3064 fn test_lakefs_auth_enum_all_variants() {
3065 let _basic = LakeFSAuth::Basic {
3067 access_key: "a".into(),
3068 secret_key: "s".into(),
3069 };
3070 let _token = LakeFSAuth::Token {
3071 access_key: "a".into(),
3072 secret_key: "s".into(),
3073 };
3074 let _sts = LakeFSAuth::Sts {
3075 oidc_token: "tok".into(),
3076 };
3077 let _iam = LakeFSAuth::Iam;
3078 let _oidc = LakeFSAuth::Oidc {
3079 token: "tok".into(),
3080 };
3081 let _saml = LakeFSAuth::Saml {
3082 assertion: "xml".into(),
3083 };
3084 }
3085
3086 #[test]
3087 fn test_lakefs_config_with_auth() {
3088 let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk").with_auth(
3089 LakeFSAuth::Token {
3090 access_key: "ak2".into(),
3091 secret_key: "sk2".into(),
3092 },
3093 );
3094 assert!(config.auth.is_some());
3095 match config.auth.unwrap() {
3096 LakeFSAuth::Token {
3097 access_key,
3098 secret_key,
3099 } => {
3100 assert_eq!(access_key, "ak2");
3101 assert_eq!(secret_key, "sk2");
3102 }
3103 _ => panic!("Expected Token"),
3104 }
3105 }
3106
3107 #[test]
3108 fn test_lakefs_config_without_auth_defaults_to_none() {
3109 let config = LakeFSConfig::new("http://localhost", "repo", "main", "ak", "sk");
3110 assert!(config.auth.is_none());
3111 }
3112
3113 #[tokio::test]
3114 async fn test_basic_auth_provider_header() {
3115 let provider = BasicAuthProvider::new(
3116 "AKIAIOSFODNN7EXAMPLE",
3117 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
3118 )
3119 .unwrap();
3120 let header = provider.auth_header().await.unwrap();
3121 let header_str = header.to_str().unwrap();
3122 assert!(header_str.starts_with("Basic "));
3123 let b64_part = &header_str["Basic ".len()..];
3125 let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3126 let decoded_str = String::from_utf8(decoded).unwrap();
3127 assert_eq!(
3128 decoded_str,
3129 "AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
3130 );
3131 }
3132
3133 #[tokio::test]
3134 async fn test_basic_auth_provider_mode_name() {
3135 let provider = BasicAuthProvider::new("ak", "sk").unwrap();
3136 assert_eq!(provider.mode_name(), "basic");
3137 }
3138
3139 #[tokio::test]
3140 async fn test_token_auth_provider_login_success() {
3141 let mock_server = MockServer::start().await;
3142
3143 Mock::given(method("POST"))
3145 .and(path("/api/v1/auth/login"))
3146 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3147 "token": "eyJhbGciOiJIUzI1NiJ9.test",
3148 "token_expiration": 9999999999u64
3149 })))
3150 .expect(1)
3151 .mount(&mock_server)
3152 .await;
3153
3154 let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3155 let header = provider.auth_header().await.unwrap();
3156 let header_str = header.to_str().unwrap();
3157 assert_eq!(header_str, "Bearer eyJhbGciOiJIUzI1NiJ9.test");
3158 assert_eq!(provider.mode_name(), "token");
3159 }
3160
3161 #[tokio::test]
3162 async fn test_token_auth_provider_caches_token() {
3163 let mock_server = MockServer::start().await;
3164
3165 Mock::given(method("POST"))
3166 .and(path("/api/v1/auth/login"))
3167 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3168 "token": "cached-token",
3169 "token_expiration": 9999999999u64
3170 })))
3171 .expect(1) .mount(&mock_server)
3173 .await;
3174
3175 let provider = TokenAuthProvider::new(&mock_server.uri(), "ak", "sk");
3176
3177 let h1 = provider.auth_header().await.unwrap();
3179 let h2 = provider.auth_header().await.unwrap();
3181 assert_eq!(h1, h2);
3182 }
3183
3184 #[tokio::test]
3185 async fn test_token_auth_provider_login_failure() {
3186 let mock_server = MockServer::start().await;
3187
3188 Mock::given(method("POST"))
3189 .and(path("/api/v1/auth/login"))
3190 .respond_with(ResponseTemplate::new(401).set_body_json(json!({
3191 "message": "invalid credentials"
3192 })))
3193 .expect(1)
3194 .mount(&mock_server)
3195 .await;
3196
3197 let provider = TokenAuthProvider::new(&mock_server.uri(), "bad", "creds");
3198 let result = provider.auth_header().await;
3199 assert!(result.is_err());
3200 let err_msg = format!("{}", result.unwrap_err());
3201 assert!(err_msg.contains("401"));
3202 }
3203
3204 #[tokio::test]
3205 async fn test_sts_auth_provider_success() {
3206 let mock_server = MockServer::start().await;
3207
3208 Mock::given(method("POST"))
3209 .and(path("/sts/login"))
3210 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3211 "Credentials": {
3212 "AccessKeyId": "ASIATEMP",
3213 "SecretAccessKey": "tempSecret",
3214 "SessionToken": "FwoGZXIvY...",
3215 "Expiration": 9999999999u64
3216 }
3217 })))
3218 .expect(1)
3219 .mount(&mock_server)
3220 .await;
3221
3222 let provider = StsAuthProvider::new(&mock_server.uri(), "my-oidc-token");
3223 let header = provider.auth_header().await.unwrap();
3224 let header_str = header.to_str().unwrap();
3225 assert!(header_str.starts_with("Basic "));
3226 let b64_part = &header_str["Basic ".len()..];
3228 let decoded = general_purpose::STANDARD.decode(b64_part).unwrap();
3229 let decoded_str = String::from_utf8(decoded).unwrap();
3230 assert_eq!(decoded_str, "ASIATEMP:tempSecret");
3231 assert_eq!(provider.mode_name(), "sts");
3232 }
3233
3234 #[tokio::test]
3235 async fn test_oidc_auth_provider_success() {
3236 let mock_server = MockServer::start().await;
3237
3238 Mock::given(method("POST"))
3239 .and(path("/api/v1/oidc/login"))
3240 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3241 "token": "oidc-lakefs-jwt-token",
3242 "token_expiration": 9999999999u64
3243 })))
3244 .expect(1)
3245 .mount(&mock_server)
3246 .await;
3247
3248 let provider = OidcAuthProvider::new(&mock_server.uri(), "external-oidc-token");
3249 let header = provider.auth_header().await.unwrap();
3250 let header_str = header.to_str().unwrap();
3251 assert_eq!(header_str, "Bearer oidc-lakefs-jwt-token");
3252 assert_eq!(provider.mode_name(), "oidc");
3253 }
3254
3255 #[tokio::test]
3256 async fn test_saml_auth_provider_success() {
3257 let mock_server = MockServer::start().await;
3258
3259 Mock::given(method("POST"))
3260 .and(path("/api/v1/auth/external/saml"))
3261 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3262 "token": "saml-lakefs-jwt-token",
3263 "token_expiration": 9999999999u64
3264 })))
3265 .expect(1)
3266 .mount(&mock_server)
3267 .await;
3268
3269 let provider = SamlAuthProvider::new(&mock_server.uri(), "PHNhbWxSZXNwb25zZT4=");
3270 let header = provider.auth_header().await.unwrap();
3271 let header_str = header.to_str().unwrap();
3272 assert_eq!(header_str, "Bearer saml-lakefs-jwt-token");
3273 assert_eq!(provider.mode_name(), "saml");
3274 }
3275
3276 #[tokio::test]
3277 async fn test_iam_auth_provider_mode_name() {
3278 let provider = IamAuthProvider::new();
3279 assert_eq!(provider.mode_name(), "iam");
3280 }
3281
3282 #[tokio::test]
3283 async fn test_build_auth_provider_basic() {
3284 let auth = LakeFSAuth::Basic {
3285 access_key: "ak".into(),
3286 secret_key: "sk".into(),
3287 };
3288 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3289 assert_eq!(provider.mode_name(), "basic");
3290 let header = provider.auth_header().await.unwrap();
3291 assert!(header.to_str().unwrap().starts_with("Basic "));
3292 }
3293
3294 #[tokio::test]
3295 async fn test_build_auth_provider_token() {
3296 let auth = LakeFSAuth::Token {
3297 access_key: "ak".into(),
3298 secret_key: "sk".into(),
3299 };
3300 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3301 assert_eq!(provider.mode_name(), "token");
3302 }
3303
3304 #[tokio::test]
3305 async fn test_build_auth_provider_iam() {
3306 let auth = LakeFSAuth::Iam;
3307 let provider = build_auth_provider(&auth, "http://localhost").unwrap();
3308 assert_eq!(provider.mode_name(), "iam");
3309 }
3310
3311 #[tokio::test]
3312 async fn test_backend_with_basic_auth_config() {
3313 let mock_server = MockServer::start().await;
3314
3315 let config = LakeFSConfig::new(
3316 mock_server.uri(),
3317 "test-repo",
3318 "main",
3319 "test_key",
3320 "test_secret",
3321 )
3322 .with_auth(LakeFSAuth::Basic {
3323 access_key: "test_key".into(),
3324 secret_key: "test_secret".into(),
3325 });
3326
3327 let backend = LakeFSBackend::new(config).unwrap();
3328 assert_eq!(backend.auth_mode(), "basic");
3329 }
3330
3331 #[tokio::test]
3332 async fn test_backend_with_token_auth_sends_bearer() {
3333 let mock_server = MockServer::start().await;
3334
3335 Mock::given(method("POST"))
3337 .and(path("/api/v1/auth/login"))
3338 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3339 "token": "test-jwt-token",
3340 "token_expiration": 9999999999u64
3341 })))
3342 .mount(&mock_server)
3343 .await;
3344
3345 Mock::given(method("GET"))
3347 .and(path("/api/v1/repositories/test-repo"))
3348 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3349 "id": "test-repo",
3350 "storage_namespace": "s3://test",
3351 "default_branch": "main"
3352 })))
3353 .mount(&mock_server)
3354 .await;
3355
3356 let config = LakeFSConfig::new(mock_server.uri(), "test-repo", "main", "ak", "sk")
3357 .with_auth(LakeFSAuth::Token {
3358 access_key: "ak".into(),
3359 secret_key: "sk".into(),
3360 });
3361
3362 let backend = LakeFSBackend::new(config).unwrap();
3363 assert_eq!(backend.auth_mode(), "token");
3364
3365 let health = backend.health_check().await.unwrap();
3367 assert!(health);
3368 }
3369
3370 #[tokio::test]
3371 async fn test_backend_default_auth_is_basic() {
3372 let config = LakeFSConfig::new("http://localhost:8000", "test-repo", "main", "ak", "sk");
3373 let backend = LakeFSBackend::new(config).unwrap();
3375 assert_eq!(backend.auth_mode(), "basic");
3376 }
3377}
3378
3379#[cfg(all(feature = "async", feature = "networking", test))]
3383#[path = "lakefs_integration_tests.rs"]
3384mod integration_tests;