Skip to main content

adk_payments/journal/
store.rs

1use std::sync::Arc;
2
3use adk_core::identity::AdkIdentity;
4use adk_core::{AdkError, ErrorCategory, ErrorComponent, Result};
5use adk_session::{AppendEventRequest, SessionService};
6use async_trait::async_trait;
7
8use crate::domain::{TransactionRecord, TransactionState};
9use crate::journal::memory_index::PaymentMemoryIndex;
10use crate::journal::session_state::{
11    ACTIVE_INDEX_KEY, COMPLETED_INDEX_KEY, TransactionLocator, build_journal_event, parse_locators,
12    parse_record, transaction_state_storage_key,
13};
14use crate::kernel::commands::{ListUnresolvedTransactionsRequest, TransactionLookup};
15use crate::kernel::service::TransactionStore;
16
17/// Durable transaction store mirrored into app-scoped session state.
18pub struct SessionBackedTransactionStore {
19    session_service: Arc<dyn SessionService>,
20    memory_index: Option<PaymentMemoryIndex>,
21}
22
23impl SessionBackedTransactionStore {
24    /// Creates a new transaction store backed by `adk-session`.
25    #[must_use]
26    pub fn new(session_service: Arc<dyn SessionService>) -> Self {
27        Self { session_service, memory_index: None }
28    }
29
30    /// Enables semantic indexing of safe summaries through `adk-memory`.
31    #[must_use]
32    pub fn with_memory_index(mut self, memory_index: PaymentMemoryIndex) -> Self {
33        self.memory_index = Some(memory_index);
34        self
35    }
36
37    fn require_identity<'a>(
38        identity: &'a Option<AdkIdentity>,
39        code: &'static str,
40    ) -> Result<&'a AdkIdentity> {
41        identity.as_ref().ok_or_else(|| {
42            AdkError::new(
43                ErrorComponent::Session,
44                ErrorCategory::InvalidInput,
45                code,
46                "transaction journal operations require a session identity",
47            )
48        })
49    }
50
51    fn upsert_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
52        index.retain(|existing| existing != locator);
53        index.push(locator.clone());
54        index
55            .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
56    }
57
58    fn remove_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
59        index.retain(|existing| existing != locator);
60    }
61}
62
63#[async_trait]
64impl TransactionStore for SessionBackedTransactionStore {
65    async fn upsert(&self, mut record: TransactionRecord) -> Result<()> {
66        record.recompute_safe_summary();
67        let identity =
68            Self::require_identity(&record.session_identity, "payments.journal.identity_required")?
69                .clone();
70        let session = self.session_service.get_for_identity(&identity).await?;
71        let mut active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
72        let mut completed =
73            parse_locators(session.state().get(&format!("app:{COMPLETED_INDEX_KEY}")))?;
74        let locator = TransactionLocator {
75            identity: identity.clone(),
76            transaction_id: record.transaction_id.clone(),
77        };
78
79        if record.state.is_terminal() {
80            Self::remove_locator(&mut active, &locator);
81            Self::upsert_locator(&mut completed, &locator);
82        } else {
83            Self::upsert_locator(&mut active, &locator);
84            Self::remove_locator(&mut completed, &locator);
85        }
86
87        let event = build_journal_event(&record, &active, &completed)?;
88        self.session_service
89            .append_event_for_identity(AppendEventRequest { identity: identity.clone(), event })
90            .await?;
91
92        if let Some(memory_index) = &self.memory_index {
93            memory_index.index_summary(&identity, &record.safe_summary).await?;
94        }
95
96        Ok(())
97    }
98
99    async fn get(&self, lookup: TransactionLookup) -> Result<Option<TransactionRecord>> {
100        let identity =
101            Self::require_identity(&lookup.session_identity, "payments.journal.lookup_identity")?;
102        let session = self.session_service.get_for_identity(identity).await?;
103        let key = transaction_state_storage_key(identity, &lookup.transaction_id);
104        match session.state().get(&key) {
105            Some(value) => parse_record(value).map(Some),
106            None => Ok(None),
107        }
108    }
109
110    async fn list_unresolved(
111        &self,
112        request: ListUnresolvedTransactionsRequest,
113    ) -> Result<Vec<TransactionRecord>> {
114        let identity = Self::require_identity(
115            &request.session_identity,
116            "payments.journal.list_identity_required",
117        )?;
118        let session = self.session_service.get_for_identity(identity).await?;
119        let active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
120        let mut records = Vec::new();
121
122        for locator in active.into_iter().filter(|locator| &locator.identity == identity) {
123            if let Some(record) = self
124                .get(TransactionLookup {
125                    transaction_id: locator.transaction_id,
126                    session_identity: Some(identity.clone()),
127                })
128                .await?
129                && !matches!(
130                    record.state,
131                    TransactionState::Completed
132                        | TransactionState::Canceled
133                        | TransactionState::Failed
134                )
135            {
136                records.push(record);
137            }
138        }
139
140        records
141            .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
142        Ok(records)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::collections::HashMap;
149
150    use adk_artifact::InMemoryArtifactService;
151    use adk_core::identity::{AdkIdentity, AppName, SessionId, UserId};
152    use adk_core::{Content, Event, EventCompaction};
153    use adk_memory::{InMemoryMemoryService, MemoryService, SearchRequest};
154    use adk_session::{CreateRequest, GetRequest, InMemorySessionService, SessionService};
155    use chrono::{TimeZone, Utc};
156
157    use super::*;
158    use crate::domain::{
159        Cart, CartLine, CommerceActor, CommerceActorRole, CommerceMode, EvidenceReference,
160        InterventionKind, InterventionState, InterventionStatus, MerchantRef, Money,
161        ProtocolDescriptor, ProtocolEnvelopeDigest, ProtocolExtensions, TransactionId,
162        TransactionState,
163    };
164    use crate::journal::ArtifactBackedEvidenceStore;
165    use crate::kernel::commands::{EvidenceLookup, StoreEvidenceCommand};
166    use crate::kernel::service::EvidenceStore;
167
168    async fn create_identity(session_service: &InMemorySessionService) -> AdkIdentity {
169        let identity = AdkIdentity::new(
170            AppName::try_from("payments-app").unwrap(),
171            UserId::try_from("user-1").unwrap(),
172            SessionId::try_from("session-1").unwrap(),
173        );
174
175        session_service
176            .create(CreateRequest {
177                app_name: identity.app_name.as_ref().to_string(),
178                user_id: identity.user_id.as_ref().to_string(),
179                session_id: Some(identity.session_id.as_ref().to_string()),
180                state: HashMap::new(),
181            })
182            .await
183            .unwrap();
184
185        identity
186    }
187
188    fn sample_record(transaction_id: &str, identity: &AdkIdentity) -> TransactionRecord {
189        let mut record = TransactionRecord::new(
190            TransactionId::from(transaction_id),
191            CommerceActor {
192                actor_id: "shopper-agent".to_string(),
193                role: CommerceActorRole::AgentSurface,
194                display_name: Some("shopper".to_string()),
195                tenant_id: Some("tenant-1".to_string()),
196                extensions: ProtocolExtensions::default(),
197            },
198            MerchantRef {
199                merchant_id: "merchant-123".to_string(),
200                legal_name: "Merchant Example LLC".to_string(),
201                display_name: Some("Merchant Example".to_string()),
202                statement_descriptor: Some("MERCHANT*EXAMPLE".to_string()),
203                country_code: Some("US".to_string()),
204                website: Some("https://merchant.example".to_string()),
205                extensions: ProtocolExtensions::default(),
206            },
207            CommerceMode::HumanPresent,
208            Cart {
209                cart_id: Some(format!("cart-{transaction_id}")),
210                lines: vec![CartLine {
211                    line_id: format!("line-{transaction_id}"),
212                    merchant_sku: Some("sku-123".to_string()),
213                    title: "Widget".to_string(),
214                    quantity: 1,
215                    unit_price: Money::new("USD", 1_500, 2),
216                    total_price: Money::new("USD", 1_500, 2),
217                    product_class: Some("widgets".to_string()),
218                    extensions: ProtocolExtensions::default(),
219                }],
220                subtotal: Some(Money::new("USD", 1_500, 2)),
221                adjustments: Vec::new(),
222                total: Money::new("USD", 1_500, 2),
223                affiliate_attribution: None,
224                extensions: ProtocolExtensions::default(),
225            },
226            Utc.with_ymd_and_hms(2026, 3, 22, 10, 0, 0).unwrap(),
227        );
228        record.session_identity = Some(identity.clone());
229        record
230    }
231
232    #[tokio::test]
233    async fn recalls_unresolved_and_completed_transactions_after_compaction_like_history_loss() {
234        let session_service = Arc::new(InMemorySessionService::new());
235        let identity = create_identity(session_service.as_ref()).await;
236        let store = SessionBackedTransactionStore::new(session_service.clone());
237
238        let mut unresolved = sample_record("tx-unresolved", &identity);
239        unresolved
240            .transition_to(
241                TransactionState::Negotiating,
242                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
243            )
244            .unwrap();
245        unresolved
246            .transition_to(
247                TransactionState::InterventionRequired(Box::new(InterventionState {
248                    intervention_id: "int-1".to_string(),
249                    kind: InterventionKind::BuyerReconfirmation,
250                    status: InterventionStatus::Pending,
251                    instructions: Some(
252                        "return to the user for explicit reconfirmation".to_string(),
253                    ),
254                    continuation_token: Some("continue-123".to_string()),
255                    requested_by: None,
256                    expires_at: None,
257                    extensions: ProtocolExtensions::default(),
258                })),
259                Utc.with_ymd_and_hms(2026, 3, 22, 10, 10, 0).unwrap(),
260            )
261            .unwrap();
262        store.upsert(unresolved.clone()).await.unwrap();
263
264        let mut completed = sample_record("tx-completed", &identity);
265        completed
266            .transition_to(
267                TransactionState::Negotiating,
268                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
269            )
270            .unwrap();
271        completed
272            .transition_to(
273                TransactionState::AwaitingPaymentMethod,
274                Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
275            )
276            .unwrap();
277        completed
278            .transition_to(
279                TransactionState::Authorized,
280                Utc.with_ymd_and_hms(2026, 3, 22, 10, 7, 0).unwrap(),
281            )
282            .unwrap();
283        completed
284            .transition_to(
285                TransactionState::Completed,
286                Utc.with_ymd_and_hms(2026, 3, 22, 10, 8, 0).unwrap(),
287            )
288            .unwrap();
289        store.upsert(completed.clone()).await.unwrap();
290
291        let mut compaction_event = Event::new("runner.compaction");
292        compaction_event.author = "runner".to_string();
293        compaction_event.set_content(Content::new("system").with_text("compacted payment history"));
294        compaction_event.actions.compaction = Some(EventCompaction {
295            start_timestamp: unresolved.created_at,
296            end_timestamp: Utc.with_ymd_and_hms(2026, 3, 22, 10, 20, 0).unwrap(),
297            compacted_content: Content::new("system").with_text("older conversation compacted"),
298        });
299        session_service
300            .append_event_for_identity(adk_session::AppendEventRequest {
301                identity: identity.clone(),
302                event: compaction_event,
303            })
304            .await
305            .unwrap();
306
307        let recent_session = session_service
308            .get(GetRequest {
309                app_name: identity.app_name.as_ref().to_string(),
310                user_id: identity.user_id.as_ref().to_string(),
311                session_id: identity.session_id.as_ref().to_string(),
312                num_recent_events: Some(1),
313                after: None,
314            })
315            .await
316            .unwrap();
317        assert_eq!(recent_session.events().len(), 1);
318
319        let unresolved_after = store
320            .list_unresolved(ListUnresolvedTransactionsRequest {
321                session_identity: Some(identity.clone()),
322            })
323            .await
324            .unwrap();
325        assert_eq!(unresolved_after.len(), 1);
326        assert_eq!(unresolved_after[0].transaction_id.as_str(), "tx-unresolved");
327        assert_eq!(
328            unresolved_after[0].safe_summary.state,
329            crate::domain::TransactionStateTag::InterventionRequired
330        );
331
332        let completed_after = store
333            .get(TransactionLookup {
334                transaction_id: TransactionId::from("tx-completed"),
335                session_identity: Some(identity.clone()),
336            })
337            .await
338            .unwrap()
339            .unwrap();
340        assert_eq!(completed_after.state, TransactionState::Completed);
341    }
342
343    #[tokio::test]
344    async fn keeps_raw_evidence_out_of_state_transcript_and_memory() {
345        let session_service = Arc::new(InMemorySessionService::new());
346        let artifact_service = Arc::new(InMemoryArtifactService::new());
347        let memory_service = Arc::new(InMemoryMemoryService::new());
348        let identity = create_identity(session_service.as_ref()).await;
349        let store = SessionBackedTransactionStore::new(session_service.clone())
350            .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
351        let evidence_store = ArtifactBackedEvidenceStore::new(artifact_service);
352
353        let raw_secret =
354            r#"{"pan":"4111111111111111","cvc":"123","signedAuthorization":"signed_blob"}"#;
355
356        let mut record = sample_record("tx-secret", &identity);
357        store.upsert(record.clone()).await.unwrap();
358
359        let stored = evidence_store
360            .store(StoreEvidenceCommand {
361                transaction_id: record.transaction_id.clone(),
362                session_identity: Some(identity.clone()),
363                evidence_ref: EvidenceReference {
364                    evidence_id: "payment-mandate-raw".to_string(),
365                    protocol: ProtocolDescriptor::ap2("v0.1-alpha"),
366                    artifact_kind: "payment_mandate".to_string(),
367                    digest: None,
368                },
369                body: raw_secret.as_bytes().to_vec(),
370                content_type: "application/json".to_string(),
371            })
372            .await
373            .unwrap();
374
375        record.attach_evidence_ref(stored.evidence_ref.clone());
376        record.attach_evidence_digest(ProtocolEnvelopeDigest::new(
377            stored.evidence_ref.clone(),
378            Utc.with_ymd_and_hms(2026, 3, 22, 10, 11, 0).unwrap(),
379        ));
380        store.upsert(record.clone()).await.unwrap();
381
382        let session = session_service.get_for_identity(&identity).await.unwrap();
383        let state_json = serde_json::to_string(&session.state().all()).unwrap();
384        assert!(!state_json.contains("4111111111111111"));
385        assert!(!state_json.contains("signed_blob"));
386
387        let transcript_text = session
388            .events()
389            .all()
390            .into_iter()
391            .filter_map(|event| event.content().cloned())
392            .flat_map(|content| content.parts.into_iter())
393            .filter_map(|part| part.text().map(ToString::to_string))
394            .collect::<Vec<_>>()
395            .join(" ");
396        assert!(!transcript_text.contains("4111111111111111"));
397        assert!(!transcript_text.contains("signed_blob"));
398        assert!(transcript_text.contains("Merchant Example"));
399
400        let memories = memory_service
401            .search(SearchRequest {
402                query: "Merchant Example Widget".to_string(),
403                user_id: identity.user_id.as_ref().to_string(),
404                app_name: identity.app_name.as_ref().to_string(),
405                limit: None,
406                min_score: None,
407                project_id: None,
408            })
409            .await
410            .unwrap();
411        let memory_text = memories
412            .memories
413            .into_iter()
414            .flat_map(|entry| entry.content.parts.into_iter())
415            .filter_map(|part| part.text().map(ToString::to_string))
416            .collect::<Vec<_>>()
417            .join(" ");
418        assert!(!memory_text.contains("4111111111111111"));
419        assert!(!memory_text.contains("signed_blob"));
420        assert!(memory_text.contains("Merchant Example"));
421
422        let loaded = evidence_store
423            .load(EvidenceLookup {
424                evidence_ref: stored.evidence_ref.clone(),
425                session_identity: Some(identity.clone()),
426            })
427            .await
428            .unwrap()
429            .unwrap();
430        assert_eq!(loaded.body, raw_secret.as_bytes());
431        assert_eq!(loaded.content_type, "application/json");
432    }
433
434    #[tokio::test]
435    async fn redacts_sensitive_summary_data_before_transcript_and_memory_writes() {
436        let session_service = Arc::new(InMemorySessionService::new());
437        let memory_service = Arc::new(InMemoryMemoryService::new());
438        let identity = create_identity(session_service.as_ref()).await;
439        let store = SessionBackedTransactionStore::new(session_service.clone())
440            .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
441
442        let mut record = sample_record("tx-redacted", &identity);
443        record.cart.lines[0].title = "Widget 4111 1111 1111 1111".to_string();
444        record
445            .transition_to(
446                TransactionState::Negotiating,
447                Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
448            )
449            .unwrap();
450        record
451            .transition_to(
452                TransactionState::InterventionRequired(Box::new(InterventionState {
453                    intervention_id: "int-summary".to_string(),
454                    kind: InterventionKind::BuyerReconfirmation,
455                    status: InterventionStatus::Pending,
456                    instructions: Some(
457                        "billing address: 123 Main St; email payer@example.com; signature=signed_blob"
458                            .to_string(),
459                    ),
460                    continuation_token: Some("continue-123".to_string()),
461                    requested_by: None,
462                    expires_at: None,
463                    extensions: ProtocolExtensions::default(),
464                })),
465                Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
466            )
467            .unwrap();
468
469        store.upsert(record.clone()).await.unwrap();
470
471        let session = session_service.get_for_identity(&identity).await.unwrap();
472        let transcript_text = session
473            .events()
474            .all()
475            .into_iter()
476            .filter_map(|event| event.content().cloned())
477            .flat_map(|content| content.parts.into_iter())
478            .filter_map(|part| part.text().map(ToString::to_string))
479            .collect::<Vec<_>>()
480            .join(" ");
481        assert!(!transcript_text.contains("4111 1111 1111 1111"));
482        assert!(!transcript_text.contains("payer@example.com"));
483        assert!(!transcript_text.contains("signed_blob"));
484        assert!(transcript_text.contains("[CARD ****1111]"));
485        assert!(transcript_text.contains("[EMAIL REDACTED]"));
486        assert!(transcript_text.contains("[REDACTED sha256:"));
487
488        let memories = memory_service
489            .search(SearchRequest {
490                query: "Merchant Widget".to_string(),
491                user_id: identity.user_id.as_ref().to_string(),
492                app_name: identity.app_name.as_ref().to_string(),
493                limit: None,
494                min_score: None,
495                project_id: None,
496            })
497            .await
498            .unwrap();
499        let memory_text = memories
500            .memories
501            .into_iter()
502            .flat_map(|entry| entry.content.parts.into_iter())
503            .filter_map(|part| part.text().map(ToString::to_string))
504            .collect::<Vec<_>>()
505            .join(" ");
506        assert!(!memory_text.contains("4111 1111 1111 1111"));
507        assert!(!memory_text.contains("payer@example.com"));
508        assert!(!memory_text.contains("signed_blob"));
509        assert!(memory_text.contains("[CARD ****1111]"));
510        assert!(memory_text.contains("[EMAIL REDACTED]"));
511        assert!(memory_text.contains("[REDACTED sha256:"));
512    }
513}