1use std::sync::Arc;
2
3use adk_core::identity::AdkIdentity;
4use adk_core::{AdkError, ErrorCategory, ErrorComponent, Result};
5use adk_session::{AppendEventRequest, SessionService};
6use async_trait::async_trait;
7
8use crate::domain::{TransactionRecord, TransactionState};
9use crate::journal::memory_index::PaymentMemoryIndex;
10use crate::journal::session_state::{
11 ACTIVE_INDEX_KEY, COMPLETED_INDEX_KEY, TransactionLocator, build_journal_event, parse_locators,
12 parse_record, transaction_state_storage_key,
13};
14use crate::kernel::commands::{ListUnresolvedTransactionsRequest, TransactionLookup};
15use crate::kernel::service::TransactionStore;
16
17pub struct SessionBackedTransactionStore {
19 session_service: Arc<dyn SessionService>,
20 memory_index: Option<PaymentMemoryIndex>,
21}
22
23impl SessionBackedTransactionStore {
24 #[must_use]
26 pub fn new(session_service: Arc<dyn SessionService>) -> Self {
27 Self { session_service, memory_index: None }
28 }
29
30 #[must_use]
32 pub fn with_memory_index(mut self, memory_index: PaymentMemoryIndex) -> Self {
33 self.memory_index = Some(memory_index);
34 self
35 }
36
37 fn require_identity<'a>(
38 identity: &'a Option<AdkIdentity>,
39 code: &'static str,
40 ) -> Result<&'a AdkIdentity> {
41 identity.as_ref().ok_or_else(|| {
42 AdkError::new(
43 ErrorComponent::Session,
44 ErrorCategory::InvalidInput,
45 code,
46 "transaction journal operations require a session identity",
47 )
48 })
49 }
50
51 fn upsert_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
52 index.retain(|existing| existing != locator);
53 index.push(locator.clone());
54 index
55 .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
56 }
57
58 fn remove_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
59 index.retain(|existing| existing != locator);
60 }
61}
62
63#[async_trait]
64impl TransactionStore for SessionBackedTransactionStore {
65 async fn upsert(&self, mut record: TransactionRecord) -> Result<()> {
66 record.recompute_safe_summary();
67 let identity =
68 Self::require_identity(&record.session_identity, "payments.journal.identity_required")?
69 .clone();
70 let session = self.session_service.get_for_identity(&identity).await?;
71 let mut active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
72 let mut completed =
73 parse_locators(session.state().get(&format!("app:{COMPLETED_INDEX_KEY}")))?;
74 let locator = TransactionLocator {
75 identity: identity.clone(),
76 transaction_id: record.transaction_id.clone(),
77 };
78
79 if record.state.is_terminal() {
80 Self::remove_locator(&mut active, &locator);
81 Self::upsert_locator(&mut completed, &locator);
82 } else {
83 Self::upsert_locator(&mut active, &locator);
84 Self::remove_locator(&mut completed, &locator);
85 }
86
87 let event = build_journal_event(&record, &active, &completed)?;
88 self.session_service
89 .append_event_for_identity(AppendEventRequest { identity: identity.clone(), event })
90 .await?;
91
92 if let Some(memory_index) = &self.memory_index {
93 memory_index.index_summary(&identity, &record.safe_summary).await?;
94 }
95
96 Ok(())
97 }
98
99 async fn get(&self, lookup: TransactionLookup) -> Result<Option<TransactionRecord>> {
100 let identity =
101 Self::require_identity(&lookup.session_identity, "payments.journal.lookup_identity")?;
102 let session = self.session_service.get_for_identity(identity).await?;
103 let key = transaction_state_storage_key(identity, &lookup.transaction_id);
104 match session.state().get(&key) {
105 Some(value) => parse_record(value).map(Some),
106 None => Ok(None),
107 }
108 }
109
110 async fn list_unresolved(
111 &self,
112 request: ListUnresolvedTransactionsRequest,
113 ) -> Result<Vec<TransactionRecord>> {
114 let identity = Self::require_identity(
115 &request.session_identity,
116 "payments.journal.list_identity_required",
117 )?;
118 let session = self.session_service.get_for_identity(identity).await?;
119 let active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
120 let mut records = Vec::new();
121
122 for locator in active.into_iter().filter(|locator| &locator.identity == identity) {
123 if let Some(record) = self
124 .get(TransactionLookup {
125 transaction_id: locator.transaction_id,
126 session_identity: Some(identity.clone()),
127 })
128 .await?
129 && !matches!(
130 record.state,
131 TransactionState::Completed
132 | TransactionState::Canceled
133 | TransactionState::Failed
134 )
135 {
136 records.push(record);
137 }
138 }
139
140 records
141 .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
142 Ok(records)
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::collections::HashMap;
149
150 use adk_artifact::InMemoryArtifactService;
151 use adk_core::identity::{AdkIdentity, AppName, SessionId, UserId};
152 use adk_core::{Content, Event, EventCompaction};
153 use adk_memory::{InMemoryMemoryService, MemoryService, SearchRequest};
154 use adk_session::{CreateRequest, GetRequest, InMemorySessionService, SessionService};
155 use chrono::{TimeZone, Utc};
156
157 use super::*;
158 use crate::domain::{
159 Cart, CartLine, CommerceActor, CommerceActorRole, CommerceMode, EvidenceReference,
160 InterventionKind, InterventionState, InterventionStatus, MerchantRef, Money,
161 ProtocolDescriptor, ProtocolEnvelopeDigest, ProtocolExtensions, TransactionId,
162 TransactionState,
163 };
164 use crate::journal::ArtifactBackedEvidenceStore;
165 use crate::kernel::commands::{EvidenceLookup, StoreEvidenceCommand};
166 use crate::kernel::service::EvidenceStore;
167
168 async fn create_identity(session_service: &InMemorySessionService) -> AdkIdentity {
169 let identity = AdkIdentity::new(
170 AppName::try_from("payments-app").unwrap(),
171 UserId::try_from("user-1").unwrap(),
172 SessionId::try_from("session-1").unwrap(),
173 );
174
175 session_service
176 .create(CreateRequest {
177 app_name: identity.app_name.as_ref().to_string(),
178 user_id: identity.user_id.as_ref().to_string(),
179 session_id: Some(identity.session_id.as_ref().to_string()),
180 state: HashMap::new(),
181 })
182 .await
183 .unwrap();
184
185 identity
186 }
187
188 fn sample_record(transaction_id: &str, identity: &AdkIdentity) -> TransactionRecord {
189 let mut record = TransactionRecord::new(
190 TransactionId::from(transaction_id),
191 CommerceActor {
192 actor_id: "shopper-agent".to_string(),
193 role: CommerceActorRole::AgentSurface,
194 display_name: Some("shopper".to_string()),
195 tenant_id: Some("tenant-1".to_string()),
196 extensions: ProtocolExtensions::default(),
197 },
198 MerchantRef {
199 merchant_id: "merchant-123".to_string(),
200 legal_name: "Merchant Example LLC".to_string(),
201 display_name: Some("Merchant Example".to_string()),
202 statement_descriptor: Some("MERCHANT*EXAMPLE".to_string()),
203 country_code: Some("US".to_string()),
204 website: Some("https://merchant.example".to_string()),
205 extensions: ProtocolExtensions::default(),
206 },
207 CommerceMode::HumanPresent,
208 Cart {
209 cart_id: Some(format!("cart-{transaction_id}")),
210 lines: vec![CartLine {
211 line_id: format!("line-{transaction_id}"),
212 merchant_sku: Some("sku-123".to_string()),
213 title: "Widget".to_string(),
214 quantity: 1,
215 unit_price: Money::new("USD", 1_500, 2),
216 total_price: Money::new("USD", 1_500, 2),
217 product_class: Some("widgets".to_string()),
218 extensions: ProtocolExtensions::default(),
219 }],
220 subtotal: Some(Money::new("USD", 1_500, 2)),
221 adjustments: Vec::new(),
222 total: Money::new("USD", 1_500, 2),
223 affiliate_attribution: None,
224 extensions: ProtocolExtensions::default(),
225 },
226 Utc.with_ymd_and_hms(2026, 3, 22, 10, 0, 0).unwrap(),
227 );
228 record.session_identity = Some(identity.clone());
229 record
230 }
231
232 #[tokio::test]
233 async fn recalls_unresolved_and_completed_transactions_after_compaction_like_history_loss() {
234 let session_service = Arc::new(InMemorySessionService::new());
235 let identity = create_identity(session_service.as_ref()).await;
236 let store = SessionBackedTransactionStore::new(session_service.clone());
237
238 let mut unresolved = sample_record("tx-unresolved", &identity);
239 unresolved
240 .transition_to(
241 TransactionState::Negotiating,
242 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
243 )
244 .unwrap();
245 unresolved
246 .transition_to(
247 TransactionState::InterventionRequired(Box::new(InterventionState {
248 intervention_id: "int-1".to_string(),
249 kind: InterventionKind::BuyerReconfirmation,
250 status: InterventionStatus::Pending,
251 instructions: Some(
252 "return to the user for explicit reconfirmation".to_string(),
253 ),
254 continuation_token: Some("continue-123".to_string()),
255 requested_by: None,
256 expires_at: None,
257 extensions: ProtocolExtensions::default(),
258 })),
259 Utc.with_ymd_and_hms(2026, 3, 22, 10, 10, 0).unwrap(),
260 )
261 .unwrap();
262 store.upsert(unresolved.clone()).await.unwrap();
263
264 let mut completed = sample_record("tx-completed", &identity);
265 completed
266 .transition_to(
267 TransactionState::Negotiating,
268 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
269 )
270 .unwrap();
271 completed
272 .transition_to(
273 TransactionState::AwaitingPaymentMethod,
274 Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
275 )
276 .unwrap();
277 completed
278 .transition_to(
279 TransactionState::Authorized,
280 Utc.with_ymd_and_hms(2026, 3, 22, 10, 7, 0).unwrap(),
281 )
282 .unwrap();
283 completed
284 .transition_to(
285 TransactionState::Completed,
286 Utc.with_ymd_and_hms(2026, 3, 22, 10, 8, 0).unwrap(),
287 )
288 .unwrap();
289 store.upsert(completed.clone()).await.unwrap();
290
291 let mut compaction_event = Event::new("runner.compaction");
292 compaction_event.author = "runner".to_string();
293 compaction_event.set_content(Content::new("system").with_text("compacted payment history"));
294 compaction_event.actions.compaction = Some(EventCompaction {
295 start_timestamp: unresolved.created_at,
296 end_timestamp: Utc.with_ymd_and_hms(2026, 3, 22, 10, 20, 0).unwrap(),
297 compacted_content: Content::new("system").with_text("older conversation compacted"),
298 });
299 session_service
300 .append_event_for_identity(adk_session::AppendEventRequest {
301 identity: identity.clone(),
302 event: compaction_event,
303 })
304 .await
305 .unwrap();
306
307 let recent_session = session_service
308 .get(GetRequest {
309 app_name: identity.app_name.as_ref().to_string(),
310 user_id: identity.user_id.as_ref().to_string(),
311 session_id: identity.session_id.as_ref().to_string(),
312 num_recent_events: Some(1),
313 after: None,
314 })
315 .await
316 .unwrap();
317 assert_eq!(recent_session.events().len(), 1);
318
319 let unresolved_after = store
320 .list_unresolved(ListUnresolvedTransactionsRequest {
321 session_identity: Some(identity.clone()),
322 })
323 .await
324 .unwrap();
325 assert_eq!(unresolved_after.len(), 1);
326 assert_eq!(unresolved_after[0].transaction_id.as_str(), "tx-unresolved");
327 assert_eq!(
328 unresolved_after[0].safe_summary.state,
329 crate::domain::TransactionStateTag::InterventionRequired
330 );
331
332 let completed_after = store
333 .get(TransactionLookup {
334 transaction_id: TransactionId::from("tx-completed"),
335 session_identity: Some(identity.clone()),
336 })
337 .await
338 .unwrap()
339 .unwrap();
340 assert_eq!(completed_after.state, TransactionState::Completed);
341 }
342
343 #[tokio::test]
344 async fn keeps_raw_evidence_out_of_state_transcript_and_memory() {
345 let session_service = Arc::new(InMemorySessionService::new());
346 let artifact_service = Arc::new(InMemoryArtifactService::new());
347 let memory_service = Arc::new(InMemoryMemoryService::new());
348 let identity = create_identity(session_service.as_ref()).await;
349 let store = SessionBackedTransactionStore::new(session_service.clone())
350 .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
351 let evidence_store = ArtifactBackedEvidenceStore::new(artifact_service);
352
353 let raw_secret =
354 r#"{"pan":"4111111111111111","cvc":"123","signedAuthorization":"signed_blob"}"#;
355
356 let mut record = sample_record("tx-secret", &identity);
357 store.upsert(record.clone()).await.unwrap();
358
359 let stored = evidence_store
360 .store(StoreEvidenceCommand {
361 transaction_id: record.transaction_id.clone(),
362 session_identity: Some(identity.clone()),
363 evidence_ref: EvidenceReference {
364 evidence_id: "payment-mandate-raw".to_string(),
365 protocol: ProtocolDescriptor::ap2("v0.1-alpha"),
366 artifact_kind: "payment_mandate".to_string(),
367 digest: None,
368 },
369 body: raw_secret.as_bytes().to_vec(),
370 content_type: "application/json".to_string(),
371 })
372 .await
373 .unwrap();
374
375 record.attach_evidence_ref(stored.evidence_ref.clone());
376 record.attach_evidence_digest(ProtocolEnvelopeDigest::new(
377 stored.evidence_ref.clone(),
378 Utc.with_ymd_and_hms(2026, 3, 22, 10, 11, 0).unwrap(),
379 ));
380 store.upsert(record.clone()).await.unwrap();
381
382 let session = session_service.get_for_identity(&identity).await.unwrap();
383 let state_json = serde_json::to_string(&session.state().all()).unwrap();
384 assert!(!state_json.contains("4111111111111111"));
385 assert!(!state_json.contains("signed_blob"));
386
387 let transcript_text = session
388 .events()
389 .all()
390 .into_iter()
391 .filter_map(|event| event.content().cloned())
392 .flat_map(|content| content.parts.into_iter())
393 .filter_map(|part| part.text().map(ToString::to_string))
394 .collect::<Vec<_>>()
395 .join(" ");
396 assert!(!transcript_text.contains("4111111111111111"));
397 assert!(!transcript_text.contains("signed_blob"));
398 assert!(transcript_text.contains("Merchant Example"));
399
400 let memories = memory_service
401 .search(SearchRequest {
402 query: "Merchant Example Widget".to_string(),
403 user_id: identity.user_id.as_ref().to_string(),
404 app_name: identity.app_name.as_ref().to_string(),
405 limit: None,
406 min_score: None,
407 project_id: None,
408 })
409 .await
410 .unwrap();
411 let memory_text = memories
412 .memories
413 .into_iter()
414 .flat_map(|entry| entry.content.parts.into_iter())
415 .filter_map(|part| part.text().map(ToString::to_string))
416 .collect::<Vec<_>>()
417 .join(" ");
418 assert!(!memory_text.contains("4111111111111111"));
419 assert!(!memory_text.contains("signed_blob"));
420 assert!(memory_text.contains("Merchant Example"));
421
422 let loaded = evidence_store
423 .load(EvidenceLookup {
424 evidence_ref: stored.evidence_ref.clone(),
425 session_identity: Some(identity.clone()),
426 })
427 .await
428 .unwrap()
429 .unwrap();
430 assert_eq!(loaded.body, raw_secret.as_bytes());
431 assert_eq!(loaded.content_type, "application/json");
432 }
433
434 #[tokio::test]
435 async fn redacts_sensitive_summary_data_before_transcript_and_memory_writes() {
436 let session_service = Arc::new(InMemorySessionService::new());
437 let memory_service = Arc::new(InMemoryMemoryService::new());
438 let identity = create_identity(session_service.as_ref()).await;
439 let store = SessionBackedTransactionStore::new(session_service.clone())
440 .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
441
442 let mut record = sample_record("tx-redacted", &identity);
443 record.cart.lines[0].title = "Widget 4111 1111 1111 1111".to_string();
444 record
445 .transition_to(
446 TransactionState::Negotiating,
447 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
448 )
449 .unwrap();
450 record
451 .transition_to(
452 TransactionState::InterventionRequired(Box::new(InterventionState {
453 intervention_id: "int-summary".to_string(),
454 kind: InterventionKind::BuyerReconfirmation,
455 status: InterventionStatus::Pending,
456 instructions: Some(
457 "billing address: 123 Main St; email payer@example.com; signature=signed_blob"
458 .to_string(),
459 ),
460 continuation_token: Some("continue-123".to_string()),
461 requested_by: None,
462 expires_at: None,
463 extensions: ProtocolExtensions::default(),
464 })),
465 Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
466 )
467 .unwrap();
468
469 store.upsert(record.clone()).await.unwrap();
470
471 let session = session_service.get_for_identity(&identity).await.unwrap();
472 let transcript_text = session
473 .events()
474 .all()
475 .into_iter()
476 .filter_map(|event| event.content().cloned())
477 .flat_map(|content| content.parts.into_iter())
478 .filter_map(|part| part.text().map(ToString::to_string))
479 .collect::<Vec<_>>()
480 .join(" ");
481 assert!(!transcript_text.contains("4111 1111 1111 1111"));
482 assert!(!transcript_text.contains("payer@example.com"));
483 assert!(!transcript_text.contains("signed_blob"));
484 assert!(transcript_text.contains("[CARD ****1111]"));
485 assert!(transcript_text.contains("[EMAIL REDACTED]"));
486 assert!(transcript_text.contains("[REDACTED sha256:"));
487
488 let memories = memory_service
489 .search(SearchRequest {
490 query: "Merchant Widget".to_string(),
491 user_id: identity.user_id.as_ref().to_string(),
492 app_name: identity.app_name.as_ref().to_string(),
493 limit: None,
494 min_score: None,
495 project_id: None,
496 })
497 .await
498 .unwrap();
499 let memory_text = memories
500 .memories
501 .into_iter()
502 .flat_map(|entry| entry.content.parts.into_iter())
503 .filter_map(|part| part.text().map(ToString::to_string))
504 .collect::<Vec<_>>()
505 .join(" ");
506 assert!(!memory_text.contains("4111 1111 1111 1111"));
507 assert!(!memory_text.contains("payer@example.com"));
508 assert!(!memory_text.contains("signed_blob"));
509 assert!(memory_text.contains("[CARD ****1111]"));
510 assert!(memory_text.contains("[EMAIL REDACTED]"));
511 assert!(memory_text.contains("[REDACTED sha256:"));
512 }
513}