1use std::sync::Arc;
2
3use adk_core::identity::AdkIdentity;
4use adk_core::{AdkError, ErrorCategory, ErrorComponent, Result};
5use adk_session::{AppendEventRequest, SessionService};
6use async_trait::async_trait;
7
8use crate::domain::{TransactionRecord, TransactionState};
9use crate::journal::memory_index::PaymentMemoryIndex;
10use crate::journal::session_state::{
11 ACTIVE_INDEX_KEY, COMPLETED_INDEX_KEY, TransactionLocator, build_journal_event, parse_locators,
12 parse_record, transaction_state_storage_key,
13};
14use crate::kernel::commands::{ListUnresolvedTransactionsRequest, TransactionLookup};
15use crate::kernel::service::TransactionStore;
16
17pub struct SessionBackedTransactionStore {
19 session_service: Arc<dyn SessionService>,
20 memory_index: Option<PaymentMemoryIndex>,
21}
22
23impl SessionBackedTransactionStore {
24 #[must_use]
26 pub fn new(session_service: Arc<dyn SessionService>) -> Self {
27 Self { session_service, memory_index: None }
28 }
29
30 #[must_use]
32 pub fn with_memory_index(mut self, memory_index: PaymentMemoryIndex) -> Self {
33 self.memory_index = Some(memory_index);
34 self
35 }
36
37 fn require_identity<'a>(
38 identity: &'a Option<AdkIdentity>,
39 code: &'static str,
40 ) -> Result<&'a AdkIdentity> {
41 identity.as_ref().ok_or_else(|| {
42 AdkError::new(
43 ErrorComponent::Session,
44 ErrorCategory::InvalidInput,
45 code,
46 "transaction journal operations require a session identity",
47 )
48 })
49 }
50
51 fn upsert_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
52 index.retain(|existing| existing != locator);
53 index.push(locator.clone());
54 index
55 .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
56 }
57
58 fn remove_locator(index: &mut Vec<TransactionLocator>, locator: &TransactionLocator) {
59 index.retain(|existing| existing != locator);
60 }
61}
62
63#[async_trait]
64impl TransactionStore for SessionBackedTransactionStore {
65 async fn upsert(&self, mut record: TransactionRecord) -> Result<()> {
66 record.recompute_safe_summary();
67 let identity =
68 Self::require_identity(&record.session_identity, "payments.journal.identity_required")?
69 .clone();
70 let session = self.session_service.get_for_identity(&identity).await?;
71 let mut active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
72 let mut completed =
73 parse_locators(session.state().get(&format!("app:{COMPLETED_INDEX_KEY}")))?;
74 let locator = TransactionLocator {
75 identity: identity.clone(),
76 transaction_id: record.transaction_id.clone(),
77 };
78
79 if record.state.is_terminal() {
80 Self::remove_locator(&mut active, &locator);
81 Self::upsert_locator(&mut completed, &locator);
82 } else {
83 Self::upsert_locator(&mut active, &locator);
84 Self::remove_locator(&mut completed, &locator);
85 }
86
87 let event = build_journal_event(&record, &active, &completed)?;
88 self.session_service
89 .append_event_for_identity(AppendEventRequest { identity: identity.clone(), event })
90 .await?;
91
92 if let Some(memory_index) = &self.memory_index {
93 memory_index.index_summary(&identity, &record.safe_summary).await?;
94 }
95
96 Ok(())
97 }
98
99 async fn get(&self, lookup: TransactionLookup) -> Result<Option<TransactionRecord>> {
100 let identity =
101 Self::require_identity(&lookup.session_identity, "payments.journal.lookup_identity")?;
102 let session = self.session_service.get_for_identity(identity).await?;
103 let key = transaction_state_storage_key(identity, &lookup.transaction_id);
104 match session.state().get(&key) {
105 Some(value) => parse_record(value).map(Some),
106 None => Ok(None),
107 }
108 }
109
110 async fn list_unresolved(
111 &self,
112 request: ListUnresolvedTransactionsRequest,
113 ) -> Result<Vec<TransactionRecord>> {
114 let identity = Self::require_identity(
115 &request.session_identity,
116 "payments.journal.list_identity_required",
117 )?;
118 let session = self.session_service.get_for_identity(identity).await?;
119 let active = parse_locators(session.state().get(&format!("app:{ACTIVE_INDEX_KEY}")))?;
120 let mut records = Vec::new();
121
122 for locator in active.into_iter().filter(|locator| &locator.identity == identity) {
123 if let Some(record) = self
124 .get(TransactionLookup {
125 transaction_id: locator.transaction_id,
126 session_identity: Some(identity.clone()),
127 })
128 .await?
129 {
130 if !matches!(
131 record.state,
132 TransactionState::Completed
133 | TransactionState::Canceled
134 | TransactionState::Failed
135 ) {
136 records.push(record);
137 }
138 }
139 }
140
141 records
142 .sort_by(|left, right| left.transaction_id.as_str().cmp(right.transaction_id.as_str()));
143 Ok(records)
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::collections::HashMap;
150
151 use adk_artifact::InMemoryArtifactService;
152 use adk_core::identity::{AdkIdentity, AppName, SessionId, UserId};
153 use adk_core::{Content, Event, EventCompaction};
154 use adk_memory::{InMemoryMemoryService, MemoryService, SearchRequest};
155 use adk_session::{CreateRequest, GetRequest, InMemorySessionService, SessionService};
156 use chrono::{TimeZone, Utc};
157
158 use super::*;
159 use crate::domain::{
160 Cart, CartLine, CommerceActor, CommerceActorRole, CommerceMode, EvidenceReference,
161 InterventionKind, InterventionState, InterventionStatus, MerchantRef, Money,
162 ProtocolDescriptor, ProtocolEnvelopeDigest, ProtocolExtensions, TransactionId,
163 TransactionState,
164 };
165 use crate::journal::ArtifactBackedEvidenceStore;
166 use crate::kernel::commands::{EvidenceLookup, StoreEvidenceCommand};
167 use crate::kernel::service::EvidenceStore;
168
169 async fn create_identity(session_service: &InMemorySessionService) -> AdkIdentity {
170 let identity = AdkIdentity::new(
171 AppName::try_from("payments-app").unwrap(),
172 UserId::try_from("user-1").unwrap(),
173 SessionId::try_from("session-1").unwrap(),
174 );
175
176 session_service
177 .create(CreateRequest {
178 app_name: identity.app_name.as_ref().to_string(),
179 user_id: identity.user_id.as_ref().to_string(),
180 session_id: Some(identity.session_id.as_ref().to_string()),
181 state: HashMap::new(),
182 })
183 .await
184 .unwrap();
185
186 identity
187 }
188
189 fn sample_record(transaction_id: &str, identity: &AdkIdentity) -> TransactionRecord {
190 let mut record = TransactionRecord::new(
191 TransactionId::from(transaction_id),
192 CommerceActor {
193 actor_id: "shopper-agent".to_string(),
194 role: CommerceActorRole::AgentSurface,
195 display_name: Some("shopper".to_string()),
196 tenant_id: Some("tenant-1".to_string()),
197 extensions: ProtocolExtensions::default(),
198 },
199 MerchantRef {
200 merchant_id: "merchant-123".to_string(),
201 legal_name: "Merchant Example LLC".to_string(),
202 display_name: Some("Merchant Example".to_string()),
203 statement_descriptor: Some("MERCHANT*EXAMPLE".to_string()),
204 country_code: Some("US".to_string()),
205 website: Some("https://merchant.example".to_string()),
206 extensions: ProtocolExtensions::default(),
207 },
208 CommerceMode::HumanPresent,
209 Cart {
210 cart_id: Some(format!("cart-{transaction_id}")),
211 lines: vec![CartLine {
212 line_id: format!("line-{transaction_id}"),
213 merchant_sku: Some("sku-123".to_string()),
214 title: "Widget".to_string(),
215 quantity: 1,
216 unit_price: Money::new("USD", 1_500, 2),
217 total_price: Money::new("USD", 1_500, 2),
218 product_class: Some("widgets".to_string()),
219 extensions: ProtocolExtensions::default(),
220 }],
221 subtotal: Some(Money::new("USD", 1_500, 2)),
222 adjustments: Vec::new(),
223 total: Money::new("USD", 1_500, 2),
224 affiliate_attribution: None,
225 extensions: ProtocolExtensions::default(),
226 },
227 Utc.with_ymd_and_hms(2026, 3, 22, 10, 0, 0).unwrap(),
228 );
229 record.session_identity = Some(identity.clone());
230 record
231 }
232
233 #[tokio::test]
234 async fn recalls_unresolved_and_completed_transactions_after_compaction_like_history_loss() {
235 let session_service = Arc::new(InMemorySessionService::new());
236 let identity = create_identity(session_service.as_ref()).await;
237 let store = SessionBackedTransactionStore::new(session_service.clone());
238
239 let mut unresolved = sample_record("tx-unresolved", &identity);
240 unresolved
241 .transition_to(
242 TransactionState::Negotiating,
243 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
244 )
245 .unwrap();
246 unresolved
247 .transition_to(
248 TransactionState::InterventionRequired(Box::new(InterventionState {
249 intervention_id: "int-1".to_string(),
250 kind: InterventionKind::BuyerReconfirmation,
251 status: InterventionStatus::Pending,
252 instructions: Some(
253 "return to the user for explicit reconfirmation".to_string(),
254 ),
255 continuation_token: Some("continue-123".to_string()),
256 requested_by: None,
257 expires_at: None,
258 extensions: ProtocolExtensions::default(),
259 })),
260 Utc.with_ymd_and_hms(2026, 3, 22, 10, 10, 0).unwrap(),
261 )
262 .unwrap();
263 store.upsert(unresolved.clone()).await.unwrap();
264
265 let mut completed = sample_record("tx-completed", &identity);
266 completed
267 .transition_to(
268 TransactionState::Negotiating,
269 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
270 )
271 .unwrap();
272 completed
273 .transition_to(
274 TransactionState::AwaitingPaymentMethod,
275 Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
276 )
277 .unwrap();
278 completed
279 .transition_to(
280 TransactionState::Authorized,
281 Utc.with_ymd_and_hms(2026, 3, 22, 10, 7, 0).unwrap(),
282 )
283 .unwrap();
284 completed
285 .transition_to(
286 TransactionState::Completed,
287 Utc.with_ymd_and_hms(2026, 3, 22, 10, 8, 0).unwrap(),
288 )
289 .unwrap();
290 store.upsert(completed.clone()).await.unwrap();
291
292 let mut compaction_event = Event::new("runner.compaction");
293 compaction_event.author = "runner".to_string();
294 compaction_event.set_content(Content::new("system").with_text("compacted payment history"));
295 compaction_event.actions.compaction = Some(EventCompaction {
296 start_timestamp: unresolved.created_at,
297 end_timestamp: Utc.with_ymd_and_hms(2026, 3, 22, 10, 20, 0).unwrap(),
298 compacted_content: Content::new("system").with_text("older conversation compacted"),
299 });
300 session_service
301 .append_event_for_identity(adk_session::AppendEventRequest {
302 identity: identity.clone(),
303 event: compaction_event,
304 })
305 .await
306 .unwrap();
307
308 let recent_session = session_service
309 .get(GetRequest {
310 app_name: identity.app_name.as_ref().to_string(),
311 user_id: identity.user_id.as_ref().to_string(),
312 session_id: identity.session_id.as_ref().to_string(),
313 num_recent_events: Some(1),
314 after: None,
315 })
316 .await
317 .unwrap();
318 assert_eq!(recent_session.events().len(), 1);
319
320 let unresolved_after = store
321 .list_unresolved(ListUnresolvedTransactionsRequest {
322 session_identity: Some(identity.clone()),
323 })
324 .await
325 .unwrap();
326 assert_eq!(unresolved_after.len(), 1);
327 assert_eq!(unresolved_after[0].transaction_id.as_str(), "tx-unresolved");
328 assert_eq!(
329 unresolved_after[0].safe_summary.state,
330 crate::domain::TransactionStateTag::InterventionRequired
331 );
332
333 let completed_after = store
334 .get(TransactionLookup {
335 transaction_id: TransactionId::from("tx-completed"),
336 session_identity: Some(identity.clone()),
337 })
338 .await
339 .unwrap()
340 .unwrap();
341 assert_eq!(completed_after.state, TransactionState::Completed);
342 }
343
344 #[tokio::test]
345 async fn keeps_raw_evidence_out_of_state_transcript_and_memory() {
346 let session_service = Arc::new(InMemorySessionService::new());
347 let artifact_service = Arc::new(InMemoryArtifactService::new());
348 let memory_service = Arc::new(InMemoryMemoryService::new());
349 let identity = create_identity(session_service.as_ref()).await;
350 let store = SessionBackedTransactionStore::new(session_service.clone())
351 .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
352 let evidence_store = ArtifactBackedEvidenceStore::new(artifact_service);
353
354 let raw_secret =
355 r#"{"pan":"4111111111111111","cvc":"123","signedAuthorization":"signed_blob"}"#;
356
357 let mut record = sample_record("tx-secret", &identity);
358 store.upsert(record.clone()).await.unwrap();
359
360 let stored = evidence_store
361 .store(StoreEvidenceCommand {
362 transaction_id: record.transaction_id.clone(),
363 session_identity: Some(identity.clone()),
364 evidence_ref: EvidenceReference {
365 evidence_id: "payment-mandate-raw".to_string(),
366 protocol: ProtocolDescriptor::ap2("v0.1-alpha"),
367 artifact_kind: "payment_mandate".to_string(),
368 digest: None,
369 },
370 body: raw_secret.as_bytes().to_vec(),
371 content_type: "application/json".to_string(),
372 })
373 .await
374 .unwrap();
375
376 record.attach_evidence_ref(stored.evidence_ref.clone());
377 record.attach_evidence_digest(ProtocolEnvelopeDigest::new(
378 stored.evidence_ref.clone(),
379 Utc.with_ymd_and_hms(2026, 3, 22, 10, 11, 0).unwrap(),
380 ));
381 store.upsert(record.clone()).await.unwrap();
382
383 let session = session_service.get_for_identity(&identity).await.unwrap();
384 let state_json = serde_json::to_string(&session.state().all()).unwrap();
385 assert!(!state_json.contains("4111111111111111"));
386 assert!(!state_json.contains("signed_blob"));
387
388 let transcript_text = session
389 .events()
390 .all()
391 .into_iter()
392 .filter_map(|event| event.content().cloned())
393 .flat_map(|content| content.parts.into_iter())
394 .filter_map(|part| part.text().map(ToString::to_string))
395 .collect::<Vec<_>>()
396 .join(" ");
397 assert!(!transcript_text.contains("4111111111111111"));
398 assert!(!transcript_text.contains("signed_blob"));
399 assert!(transcript_text.contains("Merchant Example"));
400
401 let memories = memory_service
402 .search(SearchRequest {
403 query: "Merchant Example Widget".to_string(),
404 user_id: identity.user_id.as_ref().to_string(),
405 app_name: identity.app_name.as_ref().to_string(),
406 limit: None,
407 min_score: None,
408 })
409 .await
410 .unwrap();
411 let memory_text = memories
412 .memories
413 .into_iter()
414 .flat_map(|entry| entry.content.parts.into_iter())
415 .filter_map(|part| part.text().map(ToString::to_string))
416 .collect::<Vec<_>>()
417 .join(" ");
418 assert!(!memory_text.contains("4111111111111111"));
419 assert!(!memory_text.contains("signed_blob"));
420 assert!(memory_text.contains("Merchant Example"));
421
422 let loaded = evidence_store
423 .load(EvidenceLookup {
424 evidence_ref: stored.evidence_ref.clone(),
425 session_identity: Some(identity.clone()),
426 })
427 .await
428 .unwrap()
429 .unwrap();
430 assert_eq!(loaded.body, raw_secret.as_bytes());
431 assert_eq!(loaded.content_type, "application/json");
432 }
433
434 #[tokio::test]
435 async fn redacts_sensitive_summary_data_before_transcript_and_memory_writes() {
436 let session_service = Arc::new(InMemorySessionService::new());
437 let memory_service = Arc::new(InMemoryMemoryService::new());
438 let identity = create_identity(session_service.as_ref()).await;
439 let store = SessionBackedTransactionStore::new(session_service.clone())
440 .with_memory_index(PaymentMemoryIndex::new(memory_service.clone()));
441
442 let mut record = sample_record("tx-redacted", &identity);
443 record.cart.lines[0].title = "Widget 4111 1111 1111 1111".to_string();
444 record
445 .transition_to(
446 TransactionState::Negotiating,
447 Utc.with_ymd_and_hms(2026, 3, 22, 10, 5, 0).unwrap(),
448 )
449 .unwrap();
450 record
451 .transition_to(
452 TransactionState::InterventionRequired(Box::new(InterventionState {
453 intervention_id: "int-summary".to_string(),
454 kind: InterventionKind::BuyerReconfirmation,
455 status: InterventionStatus::Pending,
456 instructions: Some(
457 "billing address: 123 Main St; email payer@example.com; signature=signed_blob"
458 .to_string(),
459 ),
460 continuation_token: Some("continue-123".to_string()),
461 requested_by: None,
462 expires_at: None,
463 extensions: ProtocolExtensions::default(),
464 })),
465 Utc.with_ymd_and_hms(2026, 3, 22, 10, 6, 0).unwrap(),
466 )
467 .unwrap();
468
469 store.upsert(record.clone()).await.unwrap();
470
471 let session = session_service.get_for_identity(&identity).await.unwrap();
472 let transcript_text = session
473 .events()
474 .all()
475 .into_iter()
476 .filter_map(|event| event.content().cloned())
477 .flat_map(|content| content.parts.into_iter())
478 .filter_map(|part| part.text().map(ToString::to_string))
479 .collect::<Vec<_>>()
480 .join(" ");
481 assert!(!transcript_text.contains("4111 1111 1111 1111"));
482 assert!(!transcript_text.contains("payer@example.com"));
483 assert!(!transcript_text.contains("signed_blob"));
484 assert!(transcript_text.contains("[CARD ****1111]"));
485 assert!(transcript_text.contains("[EMAIL REDACTED]"));
486 assert!(transcript_text.contains("[REDACTED sha256:"));
487
488 let memories = memory_service
489 .search(SearchRequest {
490 query: "Merchant Widget".to_string(),
491 user_id: identity.user_id.as_ref().to_string(),
492 app_name: identity.app_name.as_ref().to_string(),
493 limit: None,
494 min_score: None,
495 })
496 .await
497 .unwrap();
498 let memory_text = memories
499 .memories
500 .into_iter()
501 .flat_map(|entry| entry.content.parts.into_iter())
502 .filter_map(|part| part.text().map(ToString::to_string))
503 .collect::<Vec<_>>()
504 .join(" ");
505 assert!(!memory_text.contains("4111 1111 1111 1111"));
506 assert!(!memory_text.contains("payer@example.com"));
507 assert!(!memory_text.contains("signed_blob"));
508 assert!(memory_text.contains("[CARD ****1111]"));
509 assert!(memory_text.contains("[EMAIL REDACTED]"));
510 assert!(memory_text.contains("[REDACTED sha256:"));
511 }
512}