1use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21use thiserror::Error;
22
23use crate::EnvId;
24use crate::audit::{Actor, AuditDecision, AuditEvent};
25use crate::integrity::{IntegrityError, StateIntegrity};
26use crate::version::SchemaVersion;
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(transparent)]
33pub struct StateEtag(pub String);
34
35impl StateEtag {
36 pub fn of<T: Serialize>(value: &T) -> Result<Self, IntegrityError> {
38 Ok(Self(StateIntegrity::sha256_of(value)?.digest))
39 }
40
41 pub fn from_integrity(integrity: &StateIntegrity) -> Self {
43 Self(integrity.digest.clone())
44 }
45
46 pub fn header_value(&self) -> String {
48 format!("\"{}\"", self.0)
49 }
50}
51
52#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
56pub struct Precondition {
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub if_match: Option<StateEtag>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub expected_generation: Option<u64>,
61}
62
63impl Precondition {
64 pub fn matching(etag: StateEtag, generation: u64) -> Self {
66 Self {
67 if_match: Some(etag),
68 expected_generation: Some(generation),
69 }
70 }
71
72 pub fn is_conditional(&self) -> bool {
75 self.if_match.is_some() || self.expected_generation.is_some()
76 }
77
78 pub fn check(
87 &self,
88 current_etag: &StateEtag,
89 current_generation: u64,
90 ) -> Result<(), PreconditionError> {
91 if !self.is_conditional() {
92 return Err(PreconditionError::Required);
93 }
94 let etag_ok = self.if_match.as_ref().is_none_or(|e| e == current_etag);
95 let gen_ok = self
96 .expected_generation
97 .is_none_or(|g| g == current_generation);
98 if etag_ok && gen_ok {
99 Ok(())
100 } else {
101 Err(PreconditionError::Conflict(ConcurrencyConflict {
102 expected_etag: self.if_match.as_ref().map(|e| e.0.clone()),
103 actual_etag: current_etag.0.clone(),
104 expected_generation: self.expected_generation,
105 actual_generation: current_generation,
106 }))
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Error)]
113pub enum PreconditionError {
114 #[error("a conditional write must pin If-Match and/or expected generation")]
117 Required,
118 #[error("precondition failed (stale generation/etag)")]
120 Conflict(ConcurrencyConflict),
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct ConcurrencyConflict {
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub expected_etag: Option<String>,
128 pub actual_etag: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub expected_generation: Option<u64>,
131 pub actual_generation: u64,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[serde(try_from = "String", into = "String")]
138pub struct IdempotencyKey(String);
139
140impl IdempotencyKey {
141 pub fn new(key: impl Into<String>) -> Result<Self, RemoteContractError> {
142 let key = key.into();
143 if key.trim().is_empty() {
144 return Err(RemoteContractError::EmptyIdempotencyKey);
145 }
146 Ok(Self(key))
147 }
148
149 pub fn as_str(&self) -> &str {
150 &self.0
151 }
152}
153
154impl TryFrom<String> for IdempotencyKey {
155 type Error = RemoteContractError;
156 fn try_from(value: String) -> Result<Self, Self::Error> {
157 Self::new(value)
158 }
159}
160
161impl From<IdempotencyKey> for String {
162 fn from(key: IdempotencyKey) -> Self {
163 key.0
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct IdempotencyRecord {
177 pub key: IdempotencyKey,
178 pub request_fingerprint: String,
181 pub response: MutationResponse,
183 pub stored_at: DateTime<Utc>,
184}
185
186impl IdempotencyRecord {
187 pub fn fingerprint<T: Serialize>(request: &T) -> Result<String, IntegrityError> {
189 Ok(StateIntegrity::sha256_of(request)?.digest)
190 }
191
192 pub fn match_request(&self, incoming_fingerprint: &str) -> IdempotencyReplay<'_> {
196 if self.request_fingerprint == incoming_fingerprint {
197 IdempotencyReplay::Replay(&self.response)
198 } else {
199 IdempotencyReplay::Conflict {
200 reason: "idempotency key reused with a different request body".to_string(),
201 }
202 }
203 }
204}
205
206#[derive(Debug)]
208pub enum IdempotencyReplay<'a> {
209 Replay(&'a MutationResponse),
212 Conflict { reason: String },
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
221#[serde(tag = "idempotency", rename_all = "kebab-case")]
222pub enum IdempotencyOutcome {
223 Applied,
225 Replayed,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct RbacRequest {
234 pub actor: Actor,
235 pub env_id: EnvId,
236 pub noun: String,
237 pub verb: String,
238 pub target: Value,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct MutationResponse {
246 pub etag: StateEtag,
247 pub generation: u64,
248 pub idempotency: IdempotencyOutcome,
249 pub audit: AuditEvent,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct BackupManifest {
255 pub schema: SchemaVersion,
256 pub backup_id: String,
257 pub env_id: EnvId,
258 pub created_at: DateTime<Utc>,
259 pub generation: u64,
260 pub integrity: StateIntegrity,
261 pub size_bytes: u64,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct RestoreRequest {
273 pub backup_id: String,
274 pub precondition: Precondition,
275}
276
277impl RestoreRequest {
278 pub fn validate(&self) -> Result<(), RemoteContractError> {
280 if !self.precondition.is_conditional() {
281 return Err(RemoteContractError::UnconditionalRestore);
282 }
283 Ok(())
284 }
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct RestoreOutcome {
292 pub restored_generation: u64,
293 pub integrity: StateIntegrity,
294}
295
296impl RestoreOutcome {
297 pub fn etag(&self) -> StateEtag {
299 StateEtag::from_integrity(&self.integrity)
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Error)]
306#[serde(tag = "kind", rename_all = "kebab-case")]
307pub enum RemoteStoreError {
308 #[error("precondition failed (stale generation/etag)")]
310 PreconditionFailed(ConcurrencyConflict),
311 #[error("precondition required: {detail}")]
313 PreconditionRequired { detail: String },
314 #[error("idempotency conflict: {reason}")]
316 IdempotencyConflict { reason: String },
317 #[error("unauthorized: {reason} (policy `{policy}`)")]
319 Unauthorized { policy: String, reason: String },
320 #[error("environment not found")]
322 NotFound,
323 #[error("integrity mismatch: expected {expected}, computed {actual}")]
325 IntegrityMismatch { expected: String, actual: String },
326 #[error("not yet implemented: {detail}")]
328 NotYetImplemented { detail: String },
329 #[error("internal store error: {message}")]
331 Internal { message: String },
332}
333
334impl RemoteStoreError {
335 pub fn http_status(&self) -> u16 {
337 match self {
338 Self::PreconditionFailed(_) => 412,
339 Self::PreconditionRequired { .. } => 428,
340 Self::IdempotencyConflict { .. } => 409,
341 Self::Unauthorized { .. } => 403,
342 Self::NotFound => 404,
343 Self::IntegrityMismatch { .. } => 422,
344 Self::NotYetImplemented { .. } => 501,
345 Self::Internal { .. } => 500,
346 }
347 }
348}
349
350impl From<PreconditionError> for RemoteStoreError {
351 fn from(err: PreconditionError) -> Self {
352 match err {
353 PreconditionError::Required => RemoteStoreError::PreconditionRequired {
354 detail: PreconditionError::Required.to_string(),
355 },
356 PreconditionError::Conflict(conflict) => RemoteStoreError::PreconditionFailed(conflict),
357 }
358 }
359}
360
361impl From<AuditDecision> for Result<(), RemoteStoreError> {
362 fn from(decision: AuditDecision) -> Self {
364 match decision {
365 AuditDecision::Allow { .. } => Ok(()),
366 AuditDecision::Deny { policy, reason } => {
367 Err(RemoteStoreError::Unauthorized { policy, reason })
368 }
369 }
370 }
371}
372
373#[derive(Debug, Clone, PartialEq, Eq, Error)]
374pub enum RemoteContractError {
375 #[error("idempotency key must not be empty")]
376 EmptyIdempotencyKey,
377 #[error("restore requires a precondition that pins prior state")]
378 UnconditionalRestore,
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::audit::AuditResult;
385
386 fn etag(s: &str) -> StateEtag {
387 StateEtag(s.to_string())
388 }
389
390 fn sample_response(etag_value: &str, generation: u64) -> MutationResponse {
391 MutationResponse {
392 etag: etag(etag_value),
393 generation,
394 idempotency: IdempotencyOutcome::Applied,
395 audit: AuditEvent {
396 schema: SchemaVersion::AUDIT_EVENT_V1.into(),
397 event_id: "01JTKW5B4W4Q5Y1CQW93F7S5VH".to_string(),
398 ts: "2026-05-20T00:00:00Z".parse().unwrap(),
399 actor: Actor {
400 kind: "local-user".to_string(),
401 user: Some("tester".to_string()),
402 uid: Some(1000),
403 },
404 env_id: "local".to_string(),
405 noun: "traffic".to_string(),
406 verb: "set".to_string(),
407 target: serde_json::json!({"env": "local"}),
408 previous_generation: Some(generation.saturating_sub(1)),
409 new_generation: Some(generation),
410 idempotency_key: Some("k1".to_string()),
411 authorization: AuditDecision::Allow {
412 policy: "local-only".to_string(),
413 reason: "ok".to_string(),
414 },
415 result: AuditResult::Ok,
416 },
417 }
418 }
419
420 #[test]
421 fn etag_derives_from_content_hash() {
422 let resource = serde_json::json!({"generation": 1, "name": "local"});
423 let tag = StateEtag::of(&resource).unwrap();
424 assert_eq!(tag.0, StateIntegrity::sha256_of(&resource).unwrap().digest);
425 assert_eq!(tag.header_value(), format!("\"{}\"", tag.0));
426 }
427
428 #[test]
429 fn precondition_empty_is_rejected_not_blindly_passed() {
430 assert!(!Precondition::default().is_conditional());
431 let err = Precondition::default().check(&etag("abc"), 7).unwrap_err();
432 assert_eq!(err, PreconditionError::Required);
433 let mapped: RemoteStoreError = err.into();
434 assert_eq!(mapped.http_status(), 428);
435 }
436
437 #[test]
438 fn precondition_matching_etag_and_generation_passes() {
439 let pre = Precondition::matching(etag("abc"), 7);
440 assert!(pre.is_conditional());
441 assert!(pre.check(&etag("abc"), 7).is_ok());
442 }
443
444 #[test]
445 fn precondition_generation_only_is_conditional() {
446 let pre = Precondition {
447 if_match: None,
448 expected_generation: Some(7),
449 };
450 assert!(pre.is_conditional());
451 assert!(pre.check(&etag("anything"), 7).is_ok());
452 }
453
454 #[test]
455 fn precondition_stale_generation_conflicts() {
456 let pre = Precondition::matching(etag("abc"), 6);
457 let PreconditionError::Conflict(conflict) = pre.check(&etag("abc"), 7).unwrap_err() else {
458 panic!("expected a conflict");
459 };
460 assert_eq!(conflict.expected_generation, Some(6));
461 assert_eq!(conflict.actual_generation, 7);
462 }
463
464 #[test]
465 fn precondition_stale_etag_conflicts() {
466 let pre = Precondition::matching(etag("old"), 7);
467 let PreconditionError::Conflict(conflict) = pre.check(&etag("new"), 7).unwrap_err() else {
468 panic!("expected a conflict");
469 };
470 assert_eq!(conflict.expected_etag.as_deref(), Some("old"));
471 assert_eq!(conflict.actual_etag, "new");
472 }
473
474 #[test]
475 fn restore_request_requires_conditional_precondition() {
476 let blind = RestoreRequest {
477 backup_id: "b1".to_string(),
478 precondition: Precondition::default(),
479 };
480 assert_eq!(
481 blind.validate().unwrap_err(),
482 RemoteContractError::UnconditionalRestore
483 );
484
485 let guarded = RestoreRequest {
486 backup_id: "b1".to_string(),
487 precondition: Precondition::matching(etag("abc"), 3),
488 };
489 assert!(guarded.validate().is_ok());
490 }
491
492 #[test]
493 fn restore_request_precondition_is_not_serde_defaulted() {
494 let err = serde_json::from_str::<RestoreRequest>(r#"{"backup_id":"b1"}"#);
497 assert!(
498 err.is_err(),
499 "missing precondition must fail to deserialize"
500 );
501 }
502
503 #[test]
504 fn idempotency_key_rejects_empty() {
505 assert!(IdempotencyKey::new(" ").is_err());
506 assert_eq!(IdempotencyKey::new("k1").unwrap().as_str(), "k1");
507 }
508
509 #[test]
510 fn idempotency_key_deserializes_through_validation() {
511 assert!(serde_json::from_str::<IdempotencyKey>("\"\"").is_err());
512 let key: IdempotencyKey = serde_json::from_str("\"01JABC\"").unwrap();
513 assert_eq!(key.as_str(), "01JABC");
514 }
515
516 #[test]
517 fn idempotency_same_body_replays_different_body_conflicts() {
518 let body = serde_json::json!({"split": [{"rev": "a", "bps": 10000}]});
519 let record = IdempotencyRecord {
520 key: IdempotencyKey::new("k1").unwrap(),
521 request_fingerprint: IdempotencyRecord::fingerprint(&body).unwrap(),
522 response: sample_response("abc", 3),
523 stored_at: Utc::now(),
524 };
525
526 let same = IdempotencyRecord::fingerprint(&body).unwrap();
527 assert!(matches!(
528 record.match_request(&same),
529 IdempotencyReplay::Replay(_)
530 ));
531
532 let other = serde_json::json!({"split": [{"rev": "b", "bps": 10000}]});
533 let other_fp = IdempotencyRecord::fingerprint(&other).unwrap();
534 assert!(matches!(
535 record.match_request(&other_fp),
536 IdempotencyReplay::Conflict { .. }
537 ));
538 }
539
540 #[test]
541 fn idempotency_replay_returns_original_response_and_audit_verbatim() {
542 let body = serde_json::json!({"split": [{"rev": "a", "bps": 10000}]});
543 let original = sample_response("abc", 3);
544 let record = IdempotencyRecord {
545 key: IdempotencyKey::new("k1").unwrap(),
546 request_fingerprint: IdempotencyRecord::fingerprint(&body).unwrap(),
547 response: original.clone(),
548 stored_at: Utc::now(),
549 };
550
551 let same = IdempotencyRecord::fingerprint(&body).unwrap();
552 let IdempotencyReplay::Replay(replayed) = record.match_request(&same) else {
553 panic!("expected a replay");
554 };
555 assert_eq!(replayed.etag, original.etag);
556 assert_eq!(replayed.generation, original.generation);
557 assert_eq!(replayed.audit.event_id, original.audit.event_id);
558 assert_eq!(replayed.audit.verb, "set");
559 let json = serde_json::to_string(&record).unwrap();
562 let back: IdempotencyRecord = serde_json::from_str(&json).unwrap();
563 assert_eq!(back.response.audit.event_id, original.audit.event_id);
564 }
565
566 #[test]
567 fn deny_decision_maps_to_unauthorized() {
568 let denied = AuditDecision::Deny {
569 policy: "local-only".to_string(),
570 reason: "non-local".to_string(),
571 };
572 let result: Result<(), RemoteStoreError> = denied.into();
573 let err = result.unwrap_err();
574 assert_eq!(err.http_status(), 403);
575 assert!(matches!(err, RemoteStoreError::Unauthorized { .. }));
576
577 let allowed = AuditDecision::Allow {
578 policy: "local-only".to_string(),
579 reason: "ok".to_string(),
580 };
581 let result: Result<(), RemoteStoreError> = allowed.into();
582 assert!(result.is_ok());
583 }
584
585 #[test]
586 fn error_status_mapping_is_stable() {
587 assert_eq!(
588 RemoteStoreError::PreconditionFailed(ConcurrencyConflict {
589 expected_etag: None,
590 actual_etag: "x".to_string(),
591 expected_generation: None,
592 actual_generation: 1,
593 })
594 .http_status(),
595 412
596 );
597 assert_eq!(
598 RemoteStoreError::PreconditionRequired {
599 detail: "x".to_string()
600 }
601 .http_status(),
602 428
603 );
604 assert_eq!(
605 RemoteStoreError::IdempotencyConflict {
606 reason: "x".to_string()
607 }
608 .http_status(),
609 409
610 );
611 assert_eq!(RemoteStoreError::NotFound.http_status(), 404);
612 assert_eq!(
613 RemoteStoreError::IntegrityMismatch {
614 expected: "a".to_string(),
615 actual: "b".to_string()
616 }
617 .http_status(),
618 422
619 );
620 assert_eq!(
621 RemoteStoreError::NotYetImplemented {
622 detail: "x".to_string()
623 }
624 .http_status(),
625 501
626 );
627 assert_eq!(
628 RemoteStoreError::Internal {
629 message: "x".to_string()
630 }
631 .http_status(),
632 500
633 );
634 }
635
636 #[test]
637 fn remote_store_error_round_trips_tagged() {
638 let err = RemoteStoreError::Unauthorized {
639 policy: "local-only".to_string(),
640 reason: "nope".to_string(),
641 };
642 let json = serde_json::to_value(&err).unwrap();
643 assert_eq!(json["kind"], "unauthorized");
644 let back: RemoteStoreError = serde_json::from_value(json).unwrap();
645 assert_eq!(back, err);
646 }
647}