Skip to main content

adk_payments/kernel/
correlator.rs

1//! Kernel-mediated cross-protocol correlation.
2//!
3//! Routes ACP stable `2026-01-30` and AP2 `v0.1-alpha` adapters through the
4//! same canonical transaction ID and journal model. Provides best-effort
5//! canonical projections where safe and returns explicit errors where direct
6//! protocol-to-protocol conversion would lose semantics or accountability
7//! evidence.
8
9use std::sync::Arc;
10
11use adk_core::Result;
12
13use crate::domain::{
14    Cart, FulfillmentSelection, OrderSnapshot, PaymentMethodSelection, ProtocolExtensions,
15    ProtocolRefs, TransactionRecord, TransactionState,
16};
17use crate::kernel::commands::TransactionLookup;
18use crate::kernel::errors::PaymentsKernelError;
19use crate::kernel::service::TransactionStore;
20
21/// Describes the originating protocol for a correlation operation.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum ProtocolOrigin {
24    Acp,
25    Ap2,
26}
27
28impl ProtocolOrigin {
29    /// Returns the protocol name string.
30    #[must_use]
31    pub const fn as_str(self) -> &'static str {
32        match self {
33            Self::Acp => "acp",
34            Self::Ap2 => "ap2",
35        }
36    }
37}
38
39/// Describes a specific protocol reference slot in [`ProtocolRefs`].
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum ProtocolRefKind {
42    AcpCheckoutSessionId,
43    AcpOrderId,
44    AcpDelegatePaymentId,
45    Ap2IntentMandateId,
46    Ap2CartMandateId,
47    Ap2PaymentMandateId,
48    Ap2PaymentReceiptId,
49}
50
51/// Result of a canonical projection attempt.
52#[derive(Debug, Clone, PartialEq)]
53pub enum ProjectionResult<T> {
54    /// The projection succeeded without semantic loss.
55    Projected(T),
56    /// The projection is not safe; the field has no canonical equivalent.
57    Unsupported { field: String, source_protocol: String, reason: String },
58}
59
60impl<T> ProjectionResult<T> {
61    /// Returns the projected value or `None` if unsupported.
62    #[must_use]
63    pub fn ok(self) -> Option<T> {
64        match self {
65            Self::Projected(value) => Some(value),
66            Self::Unsupported { .. } => None,
67        }
68    }
69
70    /// Returns `true` when the projection succeeded.
71    #[must_use]
72    pub fn is_projected(&self) -> bool {
73        matches!(self, Self::Projected(_))
74    }
75}
76
77/// Cross-protocol correlator that routes ACP and AP2 adapters through the same
78/// canonical transaction ID and journal model.
79///
80/// The correlator enforces three rules:
81/// 1. Both protocols share one canonical `TransactionId` per transaction.
82/// 2. Protocol-specific identifiers are correlated in [`ProtocolRefs`] without
83///    assuming they are interchangeable.
84/// 3. Direct protocol-to-protocol conversion is refused when it would lose
85///    semantics or accountability evidence.
86pub struct ProtocolCorrelator {
87    transaction_store: Arc<dyn TransactionStore>,
88}
89
90impl ProtocolCorrelator {
91    /// Creates a new correlator backed by the canonical transaction store.
92    #[must_use]
93    pub fn new(transaction_store: Arc<dyn TransactionStore>) -> Self {
94        Self { transaction_store }
95    }
96
97    /// Looks up a canonical transaction by its internal ID.
98    pub async fn get_transaction(
99        &self,
100        lookup: TransactionLookup,
101    ) -> Result<Option<TransactionRecord>> {
102        self.transaction_store.get(lookup).await
103    }
104
105    /// Looks up a canonical transaction by an ACP checkout session ID.
106    ///
107    /// Scans unresolved transactions for a matching `protocol_refs.acp_checkout_session_id`.
108    /// For production use, a dedicated index would be more efficient.
109    pub async fn find_by_acp_checkout_session_id(
110        &self,
111        session_identity: Option<adk_core::AdkIdentity>,
112        acp_checkout_session_id: &str,
113    ) -> Result<Option<TransactionRecord>> {
114        let unresolved = self
115            .transaction_store
116            .list_unresolved(crate::kernel::commands::ListUnresolvedTransactionsRequest {
117                session_identity: session_identity.clone(),
118            })
119            .await?;
120
121        Ok(unresolved.into_iter().find(|record| {
122            record.protocol_refs.acp_checkout_session_id.as_deref() == Some(acp_checkout_session_id)
123        }))
124    }
125
126    /// Looks up a canonical transaction by an AP2 mandate ID (intent, cart, or payment).
127    pub async fn find_by_ap2_mandate_id(
128        &self,
129        session_identity: Option<adk_core::AdkIdentity>,
130        mandate_id: &str,
131    ) -> Result<Option<TransactionRecord>> {
132        let unresolved = self
133            .transaction_store
134            .list_unresolved(crate::kernel::commands::ListUnresolvedTransactionsRequest {
135                session_identity: session_identity.clone(),
136            })
137            .await?;
138
139        Ok(unresolved.into_iter().find(|record| {
140            let refs = &record.protocol_refs;
141            refs.ap2_intent_mandate_id.as_deref() == Some(mandate_id)
142                || refs.ap2_cart_mandate_id.as_deref() == Some(mandate_id)
143                || refs.ap2_payment_mandate_id.as_deref() == Some(mandate_id)
144        }))
145    }
146
147    /// Attaches a protocol-specific reference to an existing transaction record.
148    ///
149    /// This is the canonical way to correlate ACP and AP2 identifiers under one
150    /// transaction. The correlator never overwrites an existing reference slot
151    /// with a different value.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the slot is already occupied by a different value.
156    pub fn attach_protocol_ref(
157        record: &mut TransactionRecord,
158        kind: ProtocolRefKind,
159        value: String,
160    ) -> std::result::Result<(), PaymentsKernelError> {
161        let slot = match &kind {
162            ProtocolRefKind::AcpCheckoutSessionId => {
163                &mut record.protocol_refs.acp_checkout_session_id
164            }
165            ProtocolRefKind::AcpOrderId => &mut record.protocol_refs.acp_order_id,
166            ProtocolRefKind::AcpDelegatePaymentId => {
167                &mut record.protocol_refs.acp_delegate_payment_id
168            }
169            ProtocolRefKind::Ap2IntentMandateId => &mut record.protocol_refs.ap2_intent_mandate_id,
170            ProtocolRefKind::Ap2CartMandateId => &mut record.protocol_refs.ap2_cart_mandate_id,
171            ProtocolRefKind::Ap2PaymentMandateId => {
172                &mut record.protocol_refs.ap2_payment_mandate_id
173            }
174            ProtocolRefKind::Ap2PaymentReceiptId => {
175                &mut record.protocol_refs.ap2_payment_receipt_id
176            }
177        };
178
179        if let Some(existing) = slot.as_ref() {
180            if existing != &value {
181                return Err(PaymentsKernelError::UnsupportedAction {
182                    action: format!("rebind protocol ref {kind:?} from `{existing}` to `{value}`"),
183                    protocol: "kernel".to_string(),
184                });
185            }
186            return Ok(());
187        }
188
189        *slot = Some(value);
190        Ok(())
191    }
192
193    /// Returns all protocol identifiers correlated to one canonical transaction.
194    #[must_use]
195    pub fn correlated_refs(record: &TransactionRecord) -> &ProtocolRefs {
196        &record.protocol_refs
197    }
198
199    /// Returns the set of protocol names that have contributed evidence to this
200    /// transaction.
201    #[must_use]
202    pub fn contributing_protocols(record: &TransactionRecord) -> Vec<String> {
203        let mut protocols = std::collections::BTreeSet::new();
204
205        for digest in &record.evidence_digests {
206            protocols.insert(digest.evidence_ref.protocol.name.clone());
207        }
208        for evidence_ref in &record.evidence_refs {
209            protocols.insert(evidence_ref.protocol.name.clone());
210        }
211        for envelope in record.extensions.as_slice() {
212            protocols.insert(envelope.protocol.name.clone());
213        }
214
215        if record.protocol_refs.acp_checkout_session_id.is_some()
216            || record.protocol_refs.acp_order_id.is_some()
217            || record.protocol_refs.acp_delegate_payment_id.is_some()
218        {
219            protocols.insert("acp".to_string());
220        }
221        if record.protocol_refs.ap2_intent_mandate_id.is_some()
222            || record.protocol_refs.ap2_cart_mandate_id.is_some()
223            || record.protocol_refs.ap2_payment_mandate_id.is_some()
224            || record.protocol_refs.ap2_payment_receipt_id.is_some()
225        {
226            protocols.insert("ap2".to_string());
227        }
228
229        protocols.into_iter().collect()
230    }
231
232    /// Returns `true` when both ACP and AP2 have contributed to this transaction.
233    #[must_use]
234    pub fn is_dual_protocol(record: &TransactionRecord) -> bool {
235        let protocols = Self::contributing_protocols(record);
236        protocols.contains(&"acp".to_string()) && protocols.contains(&"ap2".to_string())
237    }
238}
239
240// ---------------------------------------------------------------------------
241// Best-effort canonical projections (Task 9.2)
242// ---------------------------------------------------------------------------
243
244/// Best-effort canonical projections where ACP or AP2 data can be mapped
245/// safely without semantic loss.
246///
247/// These projections are intentionally one-directional: protocol data is
248/// projected into canonical kernel types. The kernel never projects canonical
249/// data back into a different protocol's wire format because that would
250/// fabricate provenance.
251impl ProtocolCorrelator {
252    /// Projects an ACP cart (line items + totals) into the canonical cart model.
253    ///
254    /// This projection is safe because ACP line items, totals, and currency map
255    /// directly to canonical `Cart` fields without losing structure.
256    #[must_use]
257    pub fn project_acp_cart_to_canonical(record: &TransactionRecord) -> ProjectionResult<Cart> {
258        if record.cart.cart_id.is_some() || !record.cart.lines.is_empty() {
259            return ProjectionResult::Projected(record.cart.clone());
260        }
261        ProjectionResult::Unsupported {
262            field: "cart".to_string(),
263            source_protocol: "acp".to_string(),
264            reason: "no cart data available in the transaction record".to_string(),
265        }
266    }
267
268    /// Projects an AP2 cart mandate's payment details into the canonical cart model.
269    ///
270    /// This projection is safe because AP2 `PaymentRequest.details.displayItems`
271    /// and `total` map to canonical `CartLine` and `Cart.total` without losing
272    /// the item-level structure.
273    #[must_use]
274    pub fn project_ap2_cart_to_canonical(record: &TransactionRecord) -> ProjectionResult<Cart> {
275        if record.cart.cart_id.is_some() || !record.cart.lines.is_empty() {
276            return ProjectionResult::Projected(record.cart.clone());
277        }
278        ProjectionResult::Unsupported {
279            field: "cart".to_string(),
280            source_protocol: "ap2".to_string(),
281            reason: "no cart data available in the transaction record".to_string(),
282        }
283    }
284
285    /// Projects ACP or AP2 order updates into the canonical order snapshot.
286    ///
287    /// Both protocols produce order state that maps to the canonical
288    /// `OrderSnapshot` without semantic loss.
289    #[must_use]
290    pub fn project_order_to_canonical(
291        record: &TransactionRecord,
292    ) -> ProjectionResult<OrderSnapshot> {
293        match &record.order {
294            Some(order) => ProjectionResult::Projected(order.clone()),
295            None => ProjectionResult::Unsupported {
296                field: "order".to_string(),
297                source_protocol: "kernel".to_string(),
298                reason: "transaction has no order snapshot yet".to_string(),
299            },
300        }
301    }
302
303    /// Projects the canonical transaction state into a protocol-neutral
304    /// settlement summary.
305    ///
306    /// Both ACP order updates and AP2 payment receipts can update canonical
307    /// settlement state, so this projection is safe in both directions.
308    #[must_use]
309    pub fn project_settlement_state(
310        record: &TransactionRecord,
311    ) -> ProjectionResult<TransactionState> {
312        ProjectionResult::Projected(record.state.clone())
313    }
314
315    /// Projects the canonical fulfillment selection.
316    ///
317    /// Both ACP fulfillment options and AP2 shipping options map to the
318    /// canonical `FulfillmentSelection` without semantic loss.
319    #[must_use]
320    pub fn project_fulfillment_to_canonical(
321        record: &TransactionRecord,
322    ) -> ProjectionResult<FulfillmentSelection> {
323        match &record.fulfillment {
324            Some(fulfillment) => ProjectionResult::Projected(fulfillment.clone()),
325            None => ProjectionResult::Unsupported {
326                field: "fulfillment".to_string(),
327                source_protocol: "kernel".to_string(),
328                reason: "transaction has no fulfillment selection".to_string(),
329            },
330        }
331    }
332
333    /// Projects the canonical payment method selection.
334    ///
335    /// Both ACP payment handlers and AP2 payment response method names map to
336    /// the canonical `PaymentMethodSelection` without semantic loss.
337    #[must_use]
338    pub fn project_payment_method(
339        record: &TransactionRecord,
340    ) -> ProjectionResult<PaymentMethodSelection> {
341        // Payment method is stored in extensions by both adapters.
342        // Look for the most recent payment method selection in extensions.
343        for envelope in record.extensions.as_slice().iter().rev() {
344            if let Some(selection_kind) =
345                envelope.fields.get("selection_kind").and_then(serde_json::Value::as_str)
346            {
347                return ProjectionResult::Projected(PaymentMethodSelection {
348                    selection_kind: selection_kind.to_string(),
349                    reference: envelope
350                        .fields
351                        .get("reference")
352                        .and_then(serde_json::Value::as_str)
353                        .map(str::to_string),
354                    display_hint: envelope
355                        .fields
356                        .get("display_hint")
357                        .and_then(serde_json::Value::as_str)
358                        .map(str::to_string),
359                    extensions: ProtocolExtensions::default(),
360                });
361            }
362        }
363        ProjectionResult::Unsupported {
364            field: "payment_method".to_string(),
365            source_protocol: "kernel".to_string(),
366            reason: "no payment method selection found in transaction extensions".to_string(),
367        }
368    }
369}
370
371// ---------------------------------------------------------------------------
372// Lossy conversion guards (Task 9.3)
373// ---------------------------------------------------------------------------
374
375/// Describes an unsafe cross-protocol conversion that the kernel refuses.
376#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct LossyConversionError {
378    pub source_protocol: String,
379    pub target_protocol: String,
380    pub field: String,
381    pub reason: String,
382}
383
384impl std::fmt::Display for LossyConversionError {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        write!(
387            f,
388            "lossy conversion refused: `{}` from {} cannot be safely mapped to {} ({})",
389            self.field, self.source_protocol, self.target_protocol, self.reason
390        )
391    }
392}
393
394impl From<LossyConversionError> for PaymentsKernelError {
395    fn from(value: LossyConversionError) -> Self {
396        PaymentsKernelError::UnsupportedAction {
397            action: format!(
398                "convert `{}` from {} to {}",
399                value.field, value.source_protocol, value.target_protocol
400            ),
401            protocol: value.source_protocol,
402        }
403    }
404}
405
406/// Guards against unsafe direct protocol-to-protocol conversions.
407///
408/// The kernel mediates all cross-protocol operations. These guards return
409/// explicit errors when a direct ACP-to-AP2 or AP2-to-ACP conversion would
410/// lose semantics or accountability evidence.
411impl ProtocolCorrelator {
412    /// Refuses direct conversion of an ACP delegated payment token to an AP2
413    /// user authorization credential.
414    ///
415    /// ACP delegated payment tokens are scoped PSP credentials with merchant
416    /// and amount constraints. AP2 user authorization artifacts are
417    /// cryptographic proofs of user consent. Converting one to the other would
418    /// fabricate provenance.
419    pub fn refuse_acp_delegate_to_ap2_authorization(
420        field: &str,
421    ) -> std::result::Result<(), PaymentsKernelError> {
422        Err(LossyConversionError {
423            source_protocol: "acp".to_string(),
424            target_protocol: "ap2".to_string(),
425            field: field.to_string(),
426            reason: "ACP delegated payment tokens are scoped PSP credentials; \
427                     AP2 user authorization artifacts are cryptographic proofs of user consent. \
428                     Converting one to the other would fabricate provenance."
429                .to_string(),
430        }
431        .into())
432    }
433
434    /// Refuses direct conversion of an AP2 signed user authorization to an ACP
435    /// delegated payment token.
436    ///
437    /// AP2 user authorization presentations prove user consent through
438    /// cryptographic signatures. ACP delegated payment tokens are PSP-issued
439    /// scoped credentials. Converting one to the other would lose the
440    /// cryptographic accountability chain.
441    pub fn refuse_ap2_authorization_to_acp_delegate(
442        field: &str,
443    ) -> std::result::Result<(), PaymentsKernelError> {
444        Err(LossyConversionError {
445            source_protocol: "ap2".to_string(),
446            target_protocol: "acp".to_string(),
447            field: field.to_string(),
448            reason: "AP2 signed user authorization artifacts prove consent through \
449                     cryptographic signatures. ACP delegated payment tokens are PSP-issued \
450                     scoped credentials. Converting one to the other would lose the \
451                     cryptographic accountability chain."
452                .to_string(),
453        }
454        .into())
455    }
456
457    /// Refuses direct conversion of ACP checkout session state to AP2 mandate
458    /// state.
459    ///
460    /// ACP checkout sessions are merchant-facing HTTP resources with
461    /// server-managed lifecycle. AP2 mandates are signed authorization
462    /// artifacts with explicit role separation. The state models are not
463    /// equivalent.
464    pub fn refuse_acp_session_to_ap2_mandate(
465        field: &str,
466    ) -> std::result::Result<(), PaymentsKernelError> {
467        Err(LossyConversionError {
468            source_protocol: "acp".to_string(),
469            target_protocol: "ap2".to_string(),
470            field: field.to_string(),
471            reason: "ACP checkout sessions are merchant-facing HTTP resources with \
472                     server-managed lifecycle. AP2 mandates are signed authorization \
473                     artifacts with explicit role separation. Direct conversion would \
474                     lose the authorization model."
475                .to_string(),
476        }
477        .into())
478    }
479
480    /// Refuses direct conversion of AP2 mandate state to ACP checkout session
481    /// state.
482    ///
483    /// AP2 mandates carry cryptographic authorization chains and role-separated
484    /// provenance. ACP checkout sessions are server-managed merchant resources.
485    /// Direct conversion would discard the mandate's authorization evidence.
486    pub fn refuse_ap2_mandate_to_acp_session(
487        field: &str,
488    ) -> std::result::Result<(), PaymentsKernelError> {
489        Err(LossyConversionError {
490            source_protocol: "ap2".to_string(),
491            target_protocol: "acp".to_string(),
492            field: field.to_string(),
493            reason: "AP2 mandates carry cryptographic authorization chains and role-separated \
494                     provenance. ACP checkout sessions are server-managed merchant resources. \
495                     Direct conversion would discard the mandate's authorization evidence."
496                .to_string(),
497        }
498        .into())
499    }
500
501    /// Validates that a cross-protocol operation goes through the kernel rather
502    /// than attempting direct protocol-to-protocol transcoding.
503    ///
504    /// Returns `Ok(())` when the operation is kernel-mediated (both sides use
505    /// the canonical transaction). Returns an error when the caller attempts
506    /// to bypass the kernel.
507    pub fn require_kernel_mediation(
508        record: &TransactionRecord,
509        source: ProtocolOrigin,
510        target: ProtocolOrigin,
511        operation: &str,
512    ) -> std::result::Result<(), PaymentsKernelError> {
513        if source == target {
514            return Ok(());
515        }
516
517        // The operation is kernel-mediated if the record has a canonical
518        // transaction ID and both protocols have contributed through the kernel.
519        if record.transaction_id.as_str().is_empty() {
520            return Err(PaymentsKernelError::UnsupportedAction {
521                action: format!("cross-protocol `{operation}` requires a canonical transaction ID"),
522                protocol: source.as_str().to_string(),
523            });
524        }
525
526        Ok(())
527    }
528}