Skip to main content

adk_payments/journal/
store.rs

1use std::sync::Arc;
2
3use adk_core::identity::AdkIdentity;
4use adk_core::{AdkError, ErrorCategory, ErrorComponent, Result};
5use adk_session::{AppendEventRequest, SessionService};
6use async_trait::async_trait;
7
8use crate::domain::{TransactionRecord, TransactionState};
9use crate::journal::memory_index::PaymentMemoryIndex;
10use crate::journal::session_state::{
11    ACTIVE_INDEX_KEY, COMPLETED_INDEX_KEY, TransactionLocator, build_journal_event, parse_locators,
12    parse_record, transaction_state_storage_key,
13};
14use crate::kernel::commands::{ListUnresolvedTransactionsRequest, TransactionLookup};
15use crate::kernel::service::TransactionStore;
16
17/// Durable transaction store mirrored into app-scoped session state.
18pub struct SessionBackedTransactionStore {
19    session_service: Arc<dyn SessionService>,
20    memory_index: Option<PaymentMemoryIndex>,
21}
22
23impl SessionBackedTransactionStore {
24    /// Creates a new transaction store backed by `adk-session`.
25    #[must_use]
26    pub fn new(session_service: Arc<dyn SessionService>) -> Self {
27        Self { session_service, memory_index: None }
28    }
29
30    /// Enables semantic indexing of safe summaries through `adk-memory`.
31    #[must_use]
32    pub fn with_memory_index(mut self, memory_index: PaymentMemoryIndex) -> Self {
33        self.memory_index = Some(memory_index);
34        self
35    }
36
37    fn require_identity<'a>(
38        identity: &'a Option<AdkIdentity>,
39        code: &'static str,
40    ) -> Result<&'a AdkIdentity> {
41        identity.as_ref().ok_or_else(|| {
42            AdkError::new(
43                ErrorComponent::Session,
44                ErrorCategory::InvalidInput,
45                code,
46                "transaction journal operations require a session identity",
47            )
48        })
49    }
50
51    fn upsert_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
52        index.retain(|existing| existing != locator);
53        index.push(locator.clone());
54        index
55            .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
56    }
57
58    fn remove_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
59        index.retain(|existing| existing != locator);
60    }
61}
62
63#[async_trait]
64impl TransactionStore for SessionBackedTransactionStore {
65    async fn upsert(&self, mut record: TransactionRecord) -> Result<()> {
66        record.recompute_safe_summary();
67        let identity =
68            Self::require_identity(&record.session_identity, "payments.journal.identity_required")?
69                .clone();
70        let session = self.session_service.get_for_identity(&identity).await?;
71        let mut active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
72        let mut completed =
73            parse_locators(session.state().get(&format!("app:{COMPLETED_INDEX_KEY}")))?;
74        let locator = TransactionLocator {
75            identity: identity.clone(),
76            transaction_id: record.transaction_id.clone(),
77        };
78
79        if record.state.is_terminal() {
80            Self::remove_locator(&mut active, &locator);
81            Self::upsert_locator(&mut completed, &locator);
82        } else {
83            Self::upsert_locator(&mut active, &locator);
84            Self::remove_locator(&mut completed, &locator);
85        }
86
87        let event = build_journal_event(&record, &active, &completed)?;
88        self.session_service
89            .append_event_for_identity(AppendEventRequest { identity: identity.clone(), event })
90            .await?;
91
92        if let Some(memory_index) = &self.memory_index {
93            memory_index.index_summary(&identity, &record.safe_summary).await?;
94        }
95
96        Ok(())
97    }
98
99    async fn get(&self, lookup: TransactionLookup) -> Result<Option<TransactionRecord>> {
100        let identity =
101            Self::require_identity(&lookup.session_identity, "payments.journal.lookup_identity")?;
102        let session = self.session_service.get_for_identity(identity).await?;
103        let key = transaction_state_storage_key(identity, &lookup.transaction_id);
104        match session.state().get(&key) {
105            Some(value) => parse_record(value).map(Some),
106            None => Ok(None),
107        }
108    }
109
110    async fn list_unresolved(
111        &self,
112        request: ListUnresolvedTransactionsRequest,
113    ) -> Result<Vec<TransactionRecord>> {
114        let identity = Self::require_identity(
115            &request.session_identity,
116            "payments.journal.list_identity_required",
117        )?;
118        let session = self.session_service.get_for_identity(identity).await?;
119        let active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
120        let mut records = Vec::new();
121
122        for locator in active.into_iter().filter(|locator| &locator.identity == identity) {
123            if let Some(record) = self
124                .get(TransactionLookup {
125                    transaction_id: locator.transaction_id,
126                    session_identity: Some(identity.clone()),
127                })
128                .await?
129            {
130                if !matches!(
131                    record.state,
132                    TransactionState::Completed
133                        | TransactionState::Canceled
134                        | TransactionState::Failed
135                ) {
136                    records.push(record);
137                }
138            }
139        }
140
141        records
142            .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
143        Ok(records)
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::collections::HashMap;
150
151    use adk_artifact::InMemoryArtifactService;
152    use adk_core::identity::{AdkIdentity, AppName, SessionId, UserId};
153    use adk_core::{Content, Event, EventCompaction};
154    use adk_memory::{InMemoryMemoryService, MemoryService, SearchRequest};
155    use adk_session::{CreateRequest, GetRequest, InMemorySessionService, SessionService};
156    use chrono::{TimeZone, Utc};
157
158    use super::*;
159    use crate::domain::{
160        Cart, CartLine, CommerceActor, CommerceActorRole, CommerceMode, EvidenceReference,
161        InterventionKind, InterventionState, InterventionStatus, MerchantRef, Money,
162        ProtocolDescriptor, ProtocolEnvelopeDigest, ProtocolExtensions, TransactionId,
163        TransactionState,
164    };
165    use crate::journal::ArtifactBackedEvidenceStore;
166    use crate::kernel::commands::{EvidenceLookup, StoreEvidenceCommand};
167    use crate::kernel::service::EvidenceStore;
168
169    async fn create_identity(session_service: &InMemorySessionService) -> AdkIdentity {
170        let identity = AdkIdentity::new(
171            AppName::try_from("payments-app").unwrap(),
172            UserId::try_from("user-1").unwrap(),
173            SessionId::try_from("session-1").unwrap(),
174        );
175
176        session_service
177            .create(CreateRequest {
178                app_name: identity.app_name.as_ref().to_string(),
179                user_id: identity.user_id.as_ref().to_string(),
180                session_id: Some(identity.session_id.as_ref().to_string()),
181                state: HashMap::new(),
182            })
183            .await
184            .unwrap();
185
186        identity
187    }
188
189    fn sample_record(transaction_id: &str, identity: &AdkIdentity) -> TransactionRecord {
190        let mut record = TransactionRecord::new(
191            TransactionId::from(transaction_id),
192            CommerceActor {
193                actor_id: "shopper-agent".to_string(),
194                role: CommerceActorRole::AgentSurface,
195                display_name: Some("shopper".to_string()),
196                tenant_id: Some("tenant-1".to_string()),
197                extensions: ProtocolExtensions::default(),
198            },
199            MerchantRef {
200                merchant_id: "merchant-123".to_string(),
201                legal_name: "Merchant Example LLC".to_string(),
202                display_name: Some("Merchant Example".to_string()),
203                statement_descriptor: Some("MERCHANT*EXAMPLE".to_string()),
204                country_code: Some("US".to_string()),
205                website: Some("https://merchant.example".to_string()),
206                extensions: ProtocolExtensions::default(),
207            },
208            CommerceMode::HumanPresent,
209            Cart {
210                cart_id: Some(format!("cart-{transaction_id}")),
211                lines: vec![CartLine {
212                    line_id: format!("line-{transaction_id}"),
213                    merchant_sku: Some("sku-123".to_string()),
214                    title: "Widget".to_string(),
215                    quantity: 1,
216                    unit_price: Money::new("USD", 1_500, 2),
217                    total_price: Money::new("USD", 1_500, 2),
218                    product_class: Some("widgets".to_string()),
219                    extensions: ProtocolExtensions::default(),
220                }],
221                subtotal: Some(Money::new("USD", 1_500, 2)),
222                adjustments: Vec::new(),
223                total: Money::new("USD", 1_500, 2),
224                affiliate_attribution: None,
225                extensions: ProtocolExtensions::default(),
226            },
227            Utc.with_ymd_and_hms(2026, 3, 22, 10, 0, 0).unwrap(),
228        );
229        record.session_identity = Some(identity.clone());
230        record
231    }
232
233    #[tokio::test]
234    async fn recalls_unresolved_and_completed_transactions_after_compaction_like_history_loss() {
235        let session_service = Arc::new(InMemorySessionService::new());
236        let identity = create_identity(session_service.as_ref()).await;
237        let store = SessionBackedTransactionStore::new(session_service.clone());
238
239        let mut unresolved = sample_record("tx-unresolved", &identity);
240        unresolved
241            .transition_to(
242                TransactionState::Negotiating,
243                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
244            )
245            .unwrap();
246        unresolved
247            .transition_to(
248                TransactionState::InterventionRequired(Box::new(InterventionState {
249                    intervention_id: "int-1".to_string(),
250                    kind: InterventionKind::BuyerReconfirmation,
251                    status: InterventionStatus::Pending,
252                    instructions: Some(
253                        "return to the user for explicit reconfirmation".to_string(),
254                    ),
255                    continuation_token: Some("continue-123".to_string()),
256                    requested_by: None,
257                    expires_at: None,
258                    extensions: ProtocolExtensions::default(),
259                })),
260                Utc.with_ymd_and_hms(2026, 3, 22, 10, 10, 0).unwrap(),
261            )
262            .unwrap();
263        store.upsert(unresolved.clone()).await.unwrap();
264
265        let mut completed = sample_record("tx-completed", &identity);
266        completed
267            .transition_to(
268                TransactionState::Negotiating,
269                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
270            )
271            .unwrap();
272        completed
273            .transition_to(
274                TransactionState::AwaitingPaymentMethod,
275                Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
276            )
277            .unwrap();
278        completed
279            .transition_to(
280                TransactionState::Authorized,
281                Utc.with_ymd_and_hms(2026, 3, 22, 10, 7, 0).unwrap(),
282            )
283            .unwrap();
284        completed
285            .transition_to(
286                TransactionState::Completed,
287                Utc.with_ymd_and_hms(2026, 3, 22, 10, 8, 0).unwrap(),
288            )
289            .unwrap();
290        store.upsert(completed.clone()).await.unwrap();
291
292        let mut compaction_event = Event::new("runner.compaction");
293        compaction_event.author = "runner".to_string();
294        compaction_event.set_content(Content::new("system").with_text("compacted payment history"));
295        compaction_event.actions.compaction = Some(EventCompaction {
296            start_timestamp: unresolved.created_at,
297            end_timestamp: Utc.with_ymd_and_hms(2026, 3, 22, 10, 20, 0).unwrap(),
298            compacted_content: Content::new("system").with_text("older conversation compacted"),
299        });
300        session_service
301            .append_event_for_identity(adk_session::AppendEventRequest {
302                identity: identity.clone(),
303                event: compaction_event,
304            })
305            .await
306            .unwrap();
307
308        let recent_session = session_service
309            .get(GetRequest {
310                app_name: identity.app_name.as_ref().to_string(),
311                user_id: identity.user_id.as_ref().to_string(),
312                session_id: identity.session_id.as_ref().to_string(),
313                num_recent_events: Some(1),
314                after: None,
315            })
316            .await
317            .unwrap();
318        assert_eq!(recent_session.events().len(), 1);
319
320        let unresolved_after = store
321            .list_unresolved(ListUnresolvedTransactionsRequest {
322                session_identity: Some(identity.clone()),
323            })
324            .await
325            .unwrap();
326        assert_eq!(unresolved_after.len(), 1);
327        assert_eq!(unresolved_after[0].transaction_id.as_str(), "tx-unresolved");
328        assert_eq!(
329            unresolved_after[0].safe_summary.state,
330            crate::domain::TransactionStateTag::InterventionRequired
331        );
332
333        let completed_after = store
334            .get(TransactionLookup {
335                transaction_id: TransactionId::from("tx-completed"),
336                session_identity: Some(identity.clone()),
337            })
338            .await
339            .unwrap()
340            .unwrap();
341        assert_eq!(completed_after.state, TransactionState::Completed);
342    }
343
344    #[tokio::test]
345    async fn keeps_raw_evidence_out_of_state_transcript_and_memory() {
346        let session_service = Arc::new(InMemorySessionService::new());
347        let artifact_service = Arc::new(InMemoryArtifactService::new());
348        let memory_service = Arc::new(InMemoryMemoryService::new());
349        let identity = create_identity(session_service.as_ref()).await;
350        let store = SessionBackedTransactionStore::new(session_service.clone())
351            .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
352        let evidence_store = ArtifactBackedEvidenceStore::new(artifact_service);
353
354        let raw_secret =
355            r#"{"pan":"4111111111111111","cvc":"123","signedAuthorization":"signed_blob"}"#;
356
357        let mut record = sample_record("tx-secret", &identity);
358        store.upsert(record.clone()).await.unwrap();
359
360        let stored = evidence_store
361            .store(StoreEvidenceCommand {
362                transaction_id: record.transaction_id.clone(),
363                session_identity: Some(identity.clone()),
364                evidence_ref: EvidenceReference {
365                    evidence_id: "payment-mandate-raw".to_string(),
366                    protocol: ProtocolDescriptor::ap2("v0.1-alpha"),
367                    artifact_kind: "payment_mandate".to_string(),
368                    digest: None,
369                },
370                body: raw_secret.as_bytes().to_vec(),
371                content_type: "application/json".to_string(),
372            })
373            .await
374            .unwrap();
375
376        record.attach_evidence_ref(stored.evidence_ref.clone());
377        record.attach_evidence_digest(ProtocolEnvelopeDigest::new(
378            stored.evidence_ref.clone(),
379            Utc.with_ymd_and_hms(2026, 3, 22, 10, 11, 0).unwrap(),
380        ));
381        store.upsert(record.clone()).await.unwrap();
382
383        let session = session_service.get_for_identity(&identity).await.unwrap();
384        let state_json = serde_json::to_string(&session.state().all()).unwrap();
385        assert!(!state_json.contains("4111111111111111"));
386        assert!(!state_json.contains("signed_blob"));
387
388        let transcript_text = session
389            .events()
390            .all()
391            .into_iter()
392            .filter_map(|event| event.content().cloned())
393            .flat_map(|content| content.parts.into_iter())
394            .filter_map(|part| part.text().map(ToString::to_string))
395            .collect::<Vec<_>>()
396            .join(" ");
397        assert!(!transcript_text.contains("4111111111111111"));
398        assert!(!transcript_text.contains("signed_blob"));
399        assert!(transcript_text.contains("Merchant Example"));
400
401        let memories = memory_service
402            .search(SearchRequest {
403                query: "Merchant Example Widget".to_string(),
404                user_id: identity.user_id.as_ref().to_string(),
405                app_name: identity.app_name.as_ref().to_string(),
406                limit: None,
407                min_score: None,
408            })
409            .await
410            .unwrap();
411        let memory_text = memories
412            .memories
413            .into_iter()
414            .flat_map(|entry| entry.content.parts.into_iter())
415            .filter_map(|part| part.text().map(ToString::to_string))
416            .collect::<Vec<_>>()
417            .join(" ");
418        assert!(!memory_text.contains("4111111111111111"));
419        assert!(!memory_text.contains("signed_blob"));
420        assert!(memory_text.contains("Merchant Example"));
421
422        let loaded = evidence_store
423            .load(EvidenceLookup {
424                evidence_ref: stored.evidence_ref.clone(),
425                session_identity: Some(identity.clone()),
426            })
427            .await
428            .unwrap()
429            .unwrap();
430        assert_eq!(loaded.body, raw_secret.as_bytes());
431        assert_eq!(loaded.content_type, "application/json");
432    }
433
434    #[tokio::test]
435    async fn redacts_sensitive_summary_data_before_transcript_and_memory_writes() {
436        let session_service = Arc::new(InMemorySessionService::new());
437        let memory_service = Arc::new(InMemoryMemoryService::new());
438        let identity = create_identity(session_service.as_ref()).await;
439        let store = SessionBackedTransactionStore::new(session_service.clone())
440            .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
441
442        let mut record = sample_record("tx-redacted", &identity);
443        record.cart.lines[0].title = "Widget 4111 1111 1111 1111".to_string();
444        record
445            .transition_to(
446                TransactionState::Negotiating,
447                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
448            )
449            .unwrap();
450        record
451            .transition_to(
452                TransactionState::InterventionRequired(Box::new(InterventionState {
453                    intervention_id: "int-summary".to_string(),
454                    kind: InterventionKind::BuyerReconfirmation,
455                    status: InterventionStatus::Pending,
456                    instructions: Some(
457                        "billing address: 123 Main St; email payer@example.com; signature=signed_blob"
458                            .to_string(),
459                    ),
460                    continuation_token: Some("continue-123".to_string()),
461                    requested_by: None,
462                    expires_at: None,
463                    extensions: ProtocolExtensions::default(),
464                })),
465                Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
466            )
467            .unwrap();
468
469        store.upsert(record.clone()).await.unwrap();
470
471        let session = session_service.get_for_identity(&identity).await.unwrap();
472        let transcript_text = session
473            .events()
474            .all()
475            .into_iter()
476            .filter_map(|event| event.content().cloned())
477            .flat_map(|content| content.parts.into_iter())
478            .filter_map(|part| part.text().map(ToString::to_string))
479            .collect::<Vec<_>>()
480            .join(" ");
481        assert!(!transcript_text.contains("4111 1111 1111 1111"));
482        assert!(!transcript_text.contains("payer@example.com"));
483        assert!(!transcript_text.contains("signed_blob"));
484        assert!(transcript_text.contains("[CARD ****1111]"));
485        assert!(transcript_text.contains("[EMAIL REDACTED]"));
486        assert!(transcript_text.contains("[REDACTED sha256:"));
487
488        let memories = memory_service
489            .search(SearchRequest {
490                query: "Merchant Widget".to_string(),
491                user_id: identity.user_id.as_ref().to_string(),
492                app_name: identity.app_name.as_ref().to_string(),
493                limit: None,
494                min_score: None,
495            })
496            .await
497            .unwrap();
498        let memory_text = memories
499            .memories
500            .into_iter()
501            .flat_map(|entry| entry.content.parts.into_iter())
502            .filter_map(|part| part.text().map(ToString::to_string))
503            .collect::<Vec<_>>()
504            .join(" ");
505        assert!(!memory_text.contains("4111 1111 1111 1111"));
506        assert!(!memory_text.contains("payer@example.com"));
507        assert!(!memory_text.contains("signed_blob"));
508        assert!(memory_text.contains("[CARD ****1111]"));
509        assert!(memory_text.contains("[EMAIL REDACTED]"));
510        assert!(memory_text.contains("[REDACTED sha256:"));
511    }
512}