1mod state;
2mod store;
3mod types;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use futures::future::join_all;
11use meerkat_core::{ContentInput, Message};
12use meerkat_mob::MobHandle;
13use meerkat_mob::ids::MeerkatId;
14use meerkat_mob::runtime::MobMemberListEntry;
15use serde_json::{Value, json};
16use sha2::{Digest, Sha256};
17use tokio::sync::{Semaphore, broadcast, oneshot};
18
19use crate::blob_store::BinaryBlobStore;
20use crate::console_contracts::SYSTEM_EVENT_IDENTITY;
21use crate::mob_handle_runtime::{
22 MobRuntime, assert_member_accepts_images, send_message_on_mob_with_mode,
23};
24use crate::runtime::ConsoleMember;
25use crate::unified_runtime::{ConsoleEventStore, UnifiedRuntime};
26
27pub use state::{
28 ReplaySubscriptionEffect, ReplaySubscriptionState, ReplaySubscriptionTransition, SendEffect,
29 SendState, SendTransition, SourceIngestionEffect, SourceIngestionState,
30 SourceIngestionTransition,
31};
32pub use store::{
33 ConsoleLogError, ConsoleLogResult, ConsoleLogStore, InMemoryConsoleLogStore,
34 SqliteConsoleLogStore,
35};
36pub use types::{
37 AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
38 ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleIdentityInspection, ConsoleIdentityRecord,
39 ConsoleInteractionAccepted, ConsoleReplayUnavailable, ConsoleSendRequest, ConsoleTimelineEvent,
40 ConsoleTimelinePage, ConsoleTimelineQuery, ConsoleVisibility, NewConsoleFrame,
41};
42
43const TIMELINE_CHANNEL_CAP: usize = 1024;
44const SESSION_HISTORY_PAGE_LIMIT: usize = 500;
45const SESSION_HISTORY_REFRESH_TTL_MS: u64 = 30_000;
46const SESSION_HISTORY_GROWING_REFRESH_TTL_MS: u64 = 2_000;
47const SESSION_HISTORY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(5);
48const EXPLICIT_IDENTITY_BACKFILL_WAIT: Duration = Duration::from_millis(750);
49
50#[derive(Clone)]
51pub struct MobKitConsoleAggregator {
52 inner: Arc<AggregatorInner>,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub struct ConsoleAggregatorOptions {
57 pub session_history_backfill_enabled: bool,
58 pub max_concurrent_session_backfills: usize,
59}
60
61impl Default for ConsoleAggregatorOptions {
62 fn default() -> Self {
63 Self {
64 session_history_backfill_enabled: true,
65 max_concurrent_session_backfills: 16,
66 }
67 }
68}
69
70struct AggregatorInner {
71 store: Arc<dyn ConsoleLogStore>,
72 runtimes: RwLock<BTreeMap<String, RuntimeEntry>>,
73 event_tx: broadcast::Sender<ConsoleTimelineEvent>,
74 active_session_backfills: tokio::sync::Mutex<BTreeSet<String>>,
75 opportunistic_session_backfills: tokio::sync::Mutex<BTreeSet<String>>,
76 session_backfill_permits: Arc<Semaphore>,
77 identity_read_model: ConsoleIdentityReadModel,
78 options: ConsoleAggregatorOptions,
79}
80
81#[derive(Clone)]
82struct ConsoleIdentityReadModel {
83 inner: Arc<tokio::sync::RwLock<Vec<ConsoleIdentityRecord>>>,
84 refresh_lock: Arc<tokio::sync::Mutex<()>>,
85 primed: Arc<AtomicBool>,
86}
87
88impl Default for ConsoleIdentityReadModel {
89 fn default() -> Self {
90 Self {
91 inner: Arc::new(tokio::sync::RwLock::new(Vec::new())),
92 refresh_lock: Arc::new(tokio::sync::Mutex::new(())),
93 primed: Arc::new(AtomicBool::new(false)),
94 }
95 }
96}
97
98impl ConsoleIdentityReadModel {
99 async fn snapshot(
100 &self,
101 inner: Arc<AggregatorInner>,
102 ) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
103 if !self.primed.load(Ordering::Acquire) {
104 self.prime_now(inner).await?;
105 }
106 Ok(self.inner.read().await.clone())
107 }
108
109 async fn prime_now(&self, inner: Arc<AggregatorInner>) -> ConsoleLogResult<()> {
110 if self.primed.load(Ordering::Acquire) {
111 return Ok(());
112 }
113 let _guard = self.refresh_lock.clone().lock_owned().await;
114 if self.primed.load(Ordering::Acquire) {
115 return Ok(());
116 }
117 let identities = collect_identity_records(&inner).await?;
118 *self.inner.write().await = identities;
119 self.primed.store(true, Ordering::Release);
120 Ok(())
121 }
122
123 fn refresh_soon(&self, inner: Arc<AggregatorInner>) {
124 let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else {
125 return;
126 };
127 let Ok(guard) = self.refresh_lock.clone().try_lock_owned() else {
128 return;
129 };
130 let read_model = self.clone();
131 runtime_handle.spawn(async move {
132 let _guard = guard;
133 match collect_identity_records(&inner).await {
134 Ok(identities) => {
135 *read_model.inner.write().await = identities;
136 read_model.primed.store(true, Ordering::Release);
137 }
138 Err(err) => {
139 tracing::warn!(error = %err, "console identity read-model refresh failed");
140 }
141 }
142 });
143 }
144
145 async fn replace(&self, identities: Vec<ConsoleIdentityRecord>) {
146 *self.inner.write().await = identities;
147 self.primed.store(true, Ordering::Release);
148 }
149}
150
151#[derive(Clone)]
152struct RuntimeEntry {
153 runtime_key: String,
154 identity_namespace: String,
155 runtime: MobRuntime,
156 console_events: ConsoleEventStore,
157 visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
158}
159
160#[derive(Clone)]
161struct ResolvedConsoleMember {
162 entry: RuntimeEntry,
163 handle: MobHandle,
164 member: MobMemberListEntry,
165 runtime_identity: String,
166}
167
168pub trait ConsoleVisibilityPolicy: Send + Sync {
169 fn include_implicit_delegate_members(&self) -> bool {
170 true
171 }
172
173 fn member_visible(&self, _member: &ConsoleMember) -> bool {
174 true
175 }
176
177 fn identity_visible(&self, _record: &ConsoleIdentityRecord) -> bool {
178 true
179 }
180
181 fn frame_visible(&self, _frame: &ConsoleFrame) -> bool {
182 true
183 }
184
185 fn redact_payload(&self, _frame: &NewConsoleFrame) -> Option<Value> {
186 None
187 }
188}
189
190#[derive(Debug, Default)]
191pub struct AllowAllConsoleVisibilityPolicy;
192
193impl ConsoleVisibilityPolicy for AllowAllConsoleVisibilityPolicy {}
194
195#[derive(Debug, Default)]
196pub struct HideImplicitDelegateMembersConsoleVisibilityPolicy;
197
198impl ConsoleVisibilityPolicy for HideImplicitDelegateMembersConsoleVisibilityPolicy {
199 fn include_implicit_delegate_members(&self) -> bool {
200 false
201 }
202
203 fn member_visible(&self, member: &ConsoleMember) -> bool {
204 !is_implicit_delegate_member(member.role.as_str(), &member.labels)
205 }
206
207 fn identity_visible(&self, record: &ConsoleIdentityRecord) -> bool {
208 !is_implicit_delegate_member(
209 record
210 .labels
211 .get("role")
212 .map(String::as_str)
213 .unwrap_or_default(),
214 &record.labels,
215 )
216 }
217}
218
219#[derive(Clone)]
220pub struct ConsoleRuntimeRegistration {
221 pub runtime_key: String,
222 pub runtime: Arc<UnifiedRuntime>,
223 pub identity_namespace: String,
224 pub visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
225}
226
227impl MobKitConsoleAggregator {
228 pub fn new(store: Arc<dyn ConsoleLogStore>) -> Self {
229 Self::new_with_options(store, ConsoleAggregatorOptions::default())
230 }
231
232 pub fn new_with_options(
233 store: Arc<dyn ConsoleLogStore>,
234 mut options: ConsoleAggregatorOptions,
235 ) -> Self {
236 options.max_concurrent_session_backfills = options.max_concurrent_session_backfills.max(1);
237 let (event_tx, _) = broadcast::channel(TIMELINE_CHANNEL_CAP);
238 Self {
239 inner: Arc::new(AggregatorInner {
240 store,
241 runtimes: RwLock::new(BTreeMap::new()),
242 event_tx,
243 active_session_backfills: tokio::sync::Mutex::new(BTreeSet::new()),
244 opportunistic_session_backfills: tokio::sync::Mutex::new(BTreeSet::new()),
245 session_backfill_permits: Arc::new(Semaphore::new(
246 options.max_concurrent_session_backfills,
247 )),
248 identity_read_model: ConsoleIdentityReadModel::default(),
249 options,
250 }),
251 }
252 }
253
254 pub fn in_memory() -> Self {
255 Self::new(Arc::new(InMemoryConsoleLogStore::new()))
256 }
257
258 pub fn in_memory_with_options(options: ConsoleAggregatorOptions) -> Self {
259 Self::new_with_options(Arc::new(InMemoryConsoleLogStore::new()), options)
260 }
261
262 pub fn subscribe(&self) -> broadcast::Receiver<ConsoleTimelineEvent> {
263 self.inner.event_tx.subscribe()
264 }
265
266 pub fn store(&self) -> Arc<dyn ConsoleLogStore> {
267 self.inner.store.clone()
268 }
269
270 pub fn register_runtime(&self, registration: ConsoleRuntimeRegistration) {
271 self.register_runtime_handles_with_policy(
272 registration.runtime_key,
273 registration.identity_namespace,
274 registration.runtime.mob_runtime().clone(),
275 registration.runtime.console_events(),
276 registration.visibility_policy,
277 );
278 }
279
280 pub(crate) fn register_runtime_handles_with_policy(
281 &self,
282 runtime_key: impl Into<String>,
283 identity_namespace: impl Into<String>,
284 runtime: MobRuntime,
285 console_events: ConsoleEventStore,
286 visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
287 ) {
288 let runtime_key = runtime_key.into();
289 let identity_namespace = identity_namespace.into();
290 let entry = RuntimeEntry {
291 runtime_key: runtime_key.clone(),
292 identity_namespace,
293 runtime,
294 console_events: console_events.clone(),
295 visibility_policy,
296 };
297 if let Ok(mut runtimes) = self.inner.runtimes.write() {
298 runtimes.insert(runtime_key.clone(), entry);
299 }
300 self.inner
301 .identity_read_model
302 .refresh_soon(self.inner.clone());
303 let inner = self.inner.clone();
304 let events_for_live = console_events.clone();
305 let events_for_live_recovery = console_events.clone();
306 let runtime_key_for_live = runtime_key.clone();
307 tokio::spawn(async move {
308 let mut rx = events_for_live.subscribe();
309 loop {
310 match rx.recv().await {
311 Ok(envelope) => {
312 let _ =
313 project_console_event(inner.clone(), &runtime_key_for_live, envelope)
314 .await;
315 }
316 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
317 let _ = recover_lagged_source_events(
318 inner.clone(),
319 &runtime_key_for_live,
320 &events_for_live_recovery,
321 )
322 .await;
323 }
324 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
325 }
326 }
327 });
328
329 let inner = self.inner.clone();
330 let events_for_replay = console_events;
331 let runtime_key_for_replay = runtime_key;
332 tokio::spawn(async move {
333 let mut ingestion_state = SourceIngestionState::Registered;
334 if let Ok((next, _effects)) =
335 ingestion_state.apply(SourceIngestionTransition::StartBackfill)
336 {
337 ingestion_state = next;
338 }
339 if let Ok(events) = events_for_replay.replay_all(None).await {
340 for envelope in events {
341 let _ = project_console_event(inner.clone(), &runtime_key_for_replay, envelope)
342 .await;
343 }
344 }
345 spawn_session_history_backfill(inner.clone(), runtime_key_for_replay.clone());
346 spawn_session_history_discovery_loop(inner.clone(), runtime_key_for_replay.clone());
347 if let Ok((next, _effects)) =
348 ingestion_state.apply(SourceIngestionTransition::BackfillComplete)
349 {
350 ingestion_state = next;
351 }
352 let _ = ingestion_state.apply(SourceIngestionTransition::StartLive);
353 });
354 }
355
356 pub async fn list_identities(&self) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
357 self.inner
358 .identity_read_model
359 .refresh_soon(self.inner.clone());
360 let identities = self
361 .inner
362 .identity_read_model
363 .snapshot(self.inner.clone())
364 .await?;
365 spawn_identity_backfills_for_records(self.inner.clone(), &identities);
366 Ok(identities)
367 }
368
369 pub(crate) async fn list_identities_fresh(
370 &self,
371 ) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
372 let _guard = self
373 .inner
374 .identity_read_model
375 .refresh_lock
376 .clone()
377 .lock_owned()
378 .await;
379 let identities = collect_identity_records(&self.inner).await?;
380 self.inner
381 .identity_read_model
382 .replace(identities.clone())
383 .await;
384 spawn_identity_backfills_for_records(self.inner.clone(), &identities);
385 Ok(identities)
386 }
387
388 pub async fn inspect_identity(
389 &self,
390 identity: &str,
391 ) -> ConsoleLogResult<Option<ConsoleIdentityInspection>> {
392 let Some(resolved) = self.resolve_member(identity).await else {
393 return Ok(None);
394 };
395 let Some(record) =
396 identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
397 else {
398 return Ok(None);
399 };
400 if !resolved.entry.visibility_policy.identity_visible(&record) {
401 return Ok(None);
402 }
403 if let Some(session_id) = record.session_id.clone() {
404 spawn_session_history_backfill_target(
405 self.inner.clone(),
406 SessionBackfillTarget {
407 entry: resolved.entry.clone(),
408 record: record.clone(),
409 session_id,
410 },
411 false,
412 );
413 }
414 let peers = resolved
415 .member
416 .wired_to
417 .iter()
418 .map(ToString::to_string)
419 .collect();
420 Ok(Some(ConsoleIdentityInspection {
421 identity: record,
422 peers,
423 }))
424 }
425
426 pub async fn retire_identity(&self, identity: &str) -> ConsoleLogResult<bool> {
427 let matches = self.resolve_members(identity).await;
428 let mut retired_any = false;
429 for resolved in matches {
430 let Some(record) =
431 identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member)
432 .await
433 else {
434 continue;
435 };
436 if !resolved.entry.visibility_policy.identity_visible(&record) {
437 continue;
438 }
439 resolved
440 .handle
441 .retire(MeerkatId::from(resolved.runtime_identity.as_str()))
442 .await
443 .map_err(|err| -> ConsoleLogError {
444 format!("retire failed for {identity}: {err}").into()
445 })?;
446 retired_any = true;
447 }
448 Ok(retired_any)
449 }
450
451 pub async fn clear_timeline_frames(&self) -> ConsoleLogResult<()> {
452 self.inner.store.clear_frames().await
453 }
454
455 pub async fn query_timeline(
456 &self,
457 query: ConsoleTimelineQuery,
458 ) -> ConsoleLogResult<ConsoleTimelinePage> {
459 let explicit_identity = query.identity.clone();
460 let mut page = self.inner.store.query_frames(query.clone()).await?;
461 if page.frames.is_empty()
462 && let Some(identity) = explicit_identity.clone()
463 {
464 backfill_identity_for_explicit_query(self.inner.clone(), identity).await;
465 page = self.inner.store.query_frames(query).await?;
466 }
467 let mut visible_frames = Vec::with_capacity(page.frames.len());
468 let mut identity_visibility_cache = HashMap::new();
469 for frame in page.frames {
470 let allow_historical_identity =
471 explicit_identity.as_deref() == Some(frame.identity.as_str());
472 if frame_is_visible_cached(
473 &self.inner,
474 &frame,
475 allow_historical_identity,
476 &mut identity_visibility_cache,
477 )
478 .await
479 .unwrap_or(false)
480 {
481 visible_frames.push(frame);
482 }
483 }
484 page.frames = visible_frames;
485 Ok(page)
486 }
487
488 pub async fn refresh_session_history(&self) -> ConsoleLogResult<()> {
489 let runtime_keys = self
490 .inner
491 .runtimes
492 .read()
493 .map_err(|_| runtime_registry_lock_error())?
494 .keys()
495 .cloned()
496 .collect::<Vec<_>>();
497 let results =
498 join_all(runtime_keys.into_iter().map(|runtime_key| {
499 backfill_session_history(self.inner.clone(), runtime_key, true)
500 }))
501 .await;
502 for result in results {
503 result?;
504 }
505 Ok(())
506 }
507
508 pub async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
509 self.inner.store.latest_cursor().await
510 }
511
512 pub async fn timeline_event_visible(&self, event: &ConsoleTimelineEvent) -> bool {
513 match event {
514 ConsoleTimelineEvent::ConsoleFrame { frame }
515 | ConsoleTimelineEvent::FrameUpdated { frame } => {
516 frame_is_visible(&self.inner, frame, false)
517 .await
518 .unwrap_or(false)
519 }
520 ConsoleTimelineEvent::SnapshotStarted { .. }
521 | ConsoleTimelineEvent::SnapshotComplete { .. }
522 | ConsoleTimelineEvent::ReplayUnavailable { .. } => true,
523 }
524 }
525
526 pub async fn timeline_frame_visible_for_query(
527 &self,
528 frame: &ConsoleFrame,
529 identity: Option<&str>,
530 ) -> bool {
531 let allow_historical_identity = identity == Some(frame.identity.as_str());
532 frame_is_visible(&self.inner, frame, allow_historical_identity)
533 .await
534 .unwrap_or(false)
535 }
536
537 pub async fn send(
538 &self,
539 request: ConsoleSendRequest,
540 ) -> Result<ConsoleInteractionAccepted, ConsoleSendError> {
541 validate_send_request(&request)?;
542 let Some(resolved) = self.resolve_member(&request.identity).await else {
543 return Err(ConsoleSendError::UnknownIdentity(request.identity));
544 };
545 let Some(record) =
546 identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
547 else {
548 return Err(ConsoleSendError::UnknownIdentity(request.identity));
549 };
550 if !resolved.entry.visibility_policy.identity_visible(&record) {
551 return Err(ConsoleSendError::UnknownIdentity(request.identity));
552 }
553 if !member_is_addressable(&resolved.member) {
554 return Err(ConsoleSendError::NotAddressable(request.identity));
555 }
556 if resolved.member.state == meerkat_mob::MemberState::Retiring {
557 return Err(ConsoleSendError::Retired(request.identity));
558 }
559
560 let content = content_input_from_value(&request.content)?;
561 let handling_mode = parse_handling_mode(request.handling_mode.as_deref())?;
562 assert_member_accepts_images(
563 &resolved.handle,
564 resolved.entry.runtime.session_service(),
565 &resolved.runtime_identity,
566 &content,
567 )
568 .await
569 .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
570
571 let dedupe_key = send_dedupe_key(
572 &resolved.entry.runtime_key,
573 &request.identity,
574 &request.origin,
575 &request.idempotency_key,
576 );
577 let handling_mode_value = request
578 .handling_mode
579 .as_deref()
580 .unwrap_or("queue")
581 .to_string();
582 let request_fingerprint =
583 send_request_fingerprint(&request.origin, &request.content, &handling_mode_value);
584 if let Some(existing) = self
585 .inner
586 .store
587 .frame_by_dedupe_key(&dedupe_key)
588 .await
589 .map_err(ConsoleSendError::Log)?
590 {
591 let same_request = existing.source.source_cursor.as_deref()
592 == Some(request_fingerprint.as_str())
593 || existing.source.source_cursor.is_none()
594 && existing.payload.get("origin").and_then(Value::as_str)
595 == Some(request.origin.as_str())
596 && existing.payload.get("content") == Some(&request.content)
597 && existing
598 .payload
599 .get("handling_mode")
600 .and_then(Value::as_str)
601 == Some(handling_mode_value.as_str());
602 if !same_request {
603 return Err(ConsoleSendError::IdempotencyConflict(
604 request.idempotency_key,
605 ));
606 }
607 return Ok(accepted_from_frame(&existing));
608 }
609
610 let interaction_id = format!("console-interaction-{}", hash_short(&dedupe_key));
611 resolved
612 .entry
613 .console_events
614 .reserve_interaction_value(
615 &resolved.runtime_identity,
616 Some(resolved.runtime_identity.as_str()),
617 &interaction_id,
618 &request.origin,
619 request.content.clone(),
620 )
621 .await
622 .map_err(ConsoleSendError::State)?;
623 let session_id = resolved
624 .handle
625 .resolve_bridge_session_id(&MeerkatId::from(resolved.runtime_identity.as_str()))
626 .await
627 .map(|sid| sid.to_string());
628 let mut new_frame = NewConsoleFrame {
629 id: None,
630 dedupe_key,
631 timestamp_ms: current_time_ms(),
632 runtime_key: resolved.entry.runtime_key.clone(),
633 identity: request.identity.clone(),
634 conversation_id: Some(request.identity.clone()),
635 session_id: session_id.clone(),
636 kind: "user_input".to_string(),
637 status: ConsoleFrameStatus::Accepted,
638 payload: json!({
639 "content": request.content,
640 "origin": request.origin,
641 "idempotency_key": request.idempotency_key,
642 "handling_mode": handling_mode_value,
643 }),
644 source: ConsoleFrameSource {
645 kind: ConsoleFrameSourceKind::Send,
646 source_cursor: Some(request_fingerprint),
647 },
648 source_event_id: None,
649 interaction_id: Some(interaction_id.clone()),
650 turn_id: None,
651 run_id: None,
652 parent_frame_id: None,
653 caused_by_frame_id: None,
654 };
655 if let Some(redacted) = resolved.entry.visibility_policy.redact_payload(&new_frame) {
656 new_frame.payload = redacted;
657 new_frame.status = ConsoleFrameStatus::Redacted;
658 }
659
660 let _ = SendState::Requested
661 .apply(SendTransition::PersistAccepted)
662 .map_err(ConsoleSendError::State)?;
663 let outcome = self
664 .inner
665 .store
666 .append_if_absent(new_frame)
667 .await
668 .map_err(ConsoleSendError::Log)?;
669 if outcome.disposition == AppendDisposition::Existing {
670 return Ok(accepted_from_frame(&outcome.frame));
671 }
672 let _ = self
673 .inner
674 .event_tx
675 .send(ConsoleTimelineEvent::ConsoleFrame {
676 frame: outcome.frame.clone(),
677 });
678 let accepted = accepted_from_frame(&outcome.frame);
679
680 let (dispatching, _effects) = SendState::AcceptedPersisted
681 .apply(SendTransition::StartDispatch)
682 .map_err(ConsoleSendError::State)?;
683 update_frame_status_and_emit(
684 &self.inner,
685 &outcome.frame.id,
686 ConsoleFrameStatus::Dispatching,
687 )
688 .await
689 .map_err(ConsoleSendError::Log)?;
690
691 spawn_console_send_dispatch(
692 self.inner.clone(),
693 resolved,
694 content,
695 handling_mode,
696 dispatching,
697 outcome.frame,
698 interaction_id,
699 );
700 Ok(accepted)
701 }
702
703 pub async fn reserve_identity_first_interaction(
704 &self,
705 request: ConsoleSendRequest,
706 session_id: Option<&str>,
707 ) -> Result<ConsoleInteractionAccepted, ConsoleSendError> {
708 validate_send_request(&request)?;
709 let _content = content_input_from_value(&request.content)?;
710 let handling_mode_value = request
711 .handling_mode
712 .as_deref()
713 .unwrap_or("queue")
714 .to_string();
715 let dedupe_key = send_dedupe_key(
716 "identity-first",
717 &request.identity,
718 &request.origin,
719 &request.idempotency_key,
720 );
721 let request_fingerprint =
722 send_request_fingerprint(&request.origin, &request.content, &handling_mode_value);
723 if let Some(existing) = self
724 .inner
725 .store
726 .frame_by_dedupe_key(&dedupe_key)
727 .await
728 .map_err(ConsoleSendError::Log)?
729 {
730 let same_request = existing.source.source_cursor.as_deref()
731 == Some(request_fingerprint.as_str())
732 || existing.source.source_cursor.is_none()
733 && existing.payload.get("origin").and_then(Value::as_str)
734 == Some(request.origin.as_str())
735 && existing.payload.get("content") == Some(&request.content)
736 && existing
737 .payload
738 .get("handling_mode")
739 .and_then(Value::as_str)
740 == Some(handling_mode_value.as_str());
741 if !same_request {
742 return Err(ConsoleSendError::IdempotencyConflict(
743 request.idempotency_key,
744 ));
745 }
746 return Ok(accepted_from_frame(&existing));
747 }
748
749 let interaction_id = format!("console-interaction-{}", hash_short(&dedupe_key));
750 let new_frame = NewConsoleFrame {
751 id: None,
752 dedupe_key,
753 timestamp_ms: current_time_ms(),
754 runtime_key: "identity-first".to_string(),
755 identity: request.identity.clone(),
756 conversation_id: Some(request.identity.clone()),
757 session_id: session_id.map(ToString::to_string),
758 kind: "user_input".to_string(),
759 status: ConsoleFrameStatus::Accepted,
760 payload: json!({
761 "content": request.content,
762 "origin": request.origin,
763 "idempotency_key": request.idempotency_key,
764 "handling_mode": handling_mode_value,
765 }),
766 source: ConsoleFrameSource {
767 kind: ConsoleFrameSourceKind::Send,
768 source_cursor: Some(request_fingerprint),
769 },
770 source_event_id: None,
771 interaction_id: Some(interaction_id),
772 turn_id: None,
773 run_id: None,
774 parent_frame_id: None,
775 caused_by_frame_id: None,
776 };
777 let outcome = self
778 .inner
779 .store
780 .append_if_absent(new_frame)
781 .await
782 .map_err(ConsoleSendError::Log)?;
783 let _ = self
784 .inner
785 .event_tx
786 .send(ConsoleTimelineEvent::ConsoleFrame {
787 frame: outcome.frame.clone(),
788 });
789 Ok(accepted_from_frame(&outcome.frame))
790 }
791
792 pub async fn mark_interaction_delivery_failed(
793 &self,
794 input_frame_id: &str,
795 ) -> Result<(), ConsoleSendError> {
796 update_frame_status_and_emit(
797 &self.inner,
798 input_frame_id,
799 ConsoleFrameStatus::DeliveryFailed,
800 )
801 .await
802 .map_err(ConsoleSendError::Log)?;
803 Ok(())
804 }
805
806 pub async fn binary_blob_store_for_identity(
807 &self,
808 identity: &str,
809 ) -> Result<Option<Arc<dyn BinaryBlobStore>>, ConsoleSendError> {
810 if identity.trim().is_empty() {
811 return Err(ConsoleSendError::InvalidRequest(
812 "identity must be non-empty".to_string(),
813 ));
814 }
815 let Some(resolved) = self.resolve_member(identity).await else {
816 return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
817 };
818 let Some(record) =
819 identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
820 else {
821 return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
822 };
823 if !resolved.entry.visibility_policy.identity_visible(&record) {
824 return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
825 }
826 if !member_is_addressable(&resolved.member) {
827 return Err(ConsoleSendError::NotAddressable(identity.to_string()));
828 }
829 if resolved.member.state == meerkat_mob::MemberState::Retiring {
830 return Err(ConsoleSendError::Retired(identity.to_string()));
831 }
832 Ok(resolved.entry.runtime.binary_blob_store())
833 }
834
835 pub fn binary_blob_stores(&self) -> Vec<Arc<dyn BinaryBlobStore>> {
836 self.inner
837 .runtimes
838 .read()
839 .map(|entries| {
840 entries
841 .values()
842 .filter_map(|entry| entry.runtime.binary_blob_store())
843 .collect()
844 })
845 .unwrap_or_default()
846 }
847
848 async fn resolve_member(&self, identity: &str) -> Option<ResolvedConsoleMember> {
849 self.resolve_members(identity).await.into_iter().next()
850 }
851
852 async fn resolve_members(&self, identity: &str) -> Vec<ResolvedConsoleMember> {
853 let entries = self
854 .inner
855 .runtimes
856 .read()
857 .ok()
858 .map(|entries| entries.clone())
859 .unwrap_or_default();
860 let mut matches: Vec<(String, ResolvedConsoleMember)> = Vec::new();
861 for entry in entries.values() {
862 let Some(raw_identity) = strip_namespace(identity, &entry.identity_namespace) else {
863 continue;
864 };
865 let mid = MeerkatId::from(raw_identity.as_str());
866 for resolved in member_sources_for_entry(entry)
867 .await
868 .into_iter()
869 .filter(|candidate| {
870 candidate.member.agent_identity == mid
871 || candidate
872 .member
873 .labels
874 .get("agent_identity")
875 .is_some_and(|agent_identity| agent_identity == &raw_identity)
876 })
877 {
878 let session_id = resolved
879 .handle
880 .resolve_bridge_session_id(&resolved.member.agent_identity)
881 .await
882 .map(|sid| sid.to_string())
883 .unwrap_or_default();
884 matches.push((session_id, resolved));
885 }
886 }
887 matches.sort_by(|left, right| right.0.cmp(&left.0));
888 matches.into_iter().map(|(_, resolved)| resolved).collect()
889 }
890}
891
892fn dedupe_identity_records(records: Vec<ConsoleIdentityRecord>) -> Vec<ConsoleIdentityRecord> {
893 let mut by_identity: BTreeMap<String, ConsoleIdentityRecord> = BTreeMap::new();
894 for record in records {
895 by_identity
896 .entry(record.identity.clone())
897 .and_modify(|current| {
898 if identity_record_prefer(&record, current) {
899 *current = record.clone();
900 }
901 })
902 .or_insert(record);
903 }
904 by_identity.into_values().collect()
905}
906
907fn identity_record_prefer(
908 candidate: &ConsoleIdentityRecord,
909 current: &ConsoleIdentityRecord,
910) -> bool {
911 let candidate_live = candidate.addressable && candidate.health != "retired";
912 let current_live = current.addressable && current.health != "retired";
913 if candidate_live != current_live {
914 return candidate_live;
915 }
916 candidate.session_id.as_deref().unwrap_or("") > current.session_id.as_deref().unwrap_or("")
917}
918
919async fn collect_identity_records(
920 inner: &Arc<AggregatorInner>,
921) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
922 let entries = inner
923 .runtimes
924 .read()
925 .map_err(|_| runtime_registry_lock_error())?
926 .clone();
927 let mut identities = Vec::new();
928 for entry in entries.values() {
929 for resolved in member_sources_for_entry(entry).await {
930 if let Some(record) =
931 identity_record_for_member(entry, &resolved.handle, &resolved.member).await
932 && entry.visibility_policy.identity_visible(&record)
933 {
934 identities.push(record);
935 }
936 }
937 }
938 Ok(dedupe_identity_records(identities))
939}
940
941fn spawn_identity_backfills_for_records(
942 inner: Arc<AggregatorInner>,
943 records: &[ConsoleIdentityRecord],
944) {
945 if !inner.options.session_history_backfill_enabled {
946 return;
947 }
948 let entries = match inner.runtimes.read() {
949 Ok(entries) => entries.clone(),
950 Err(_) => return,
951 };
952 for record in records {
953 let Some(entry) = entries.get(&record.runtime_key).cloned() else {
954 continue;
955 };
956 let Some(session_id) = record.session_id.clone() else {
957 continue;
958 };
959 spawn_session_history_backfill_target(
960 inner.clone(),
961 SessionBackfillTarget {
962 entry,
963 record: record.clone(),
964 session_id,
965 },
966 false,
967 );
968 }
969}
970
971async fn member_sources_for_entry(entry: &RuntimeEntry) -> Vec<ResolvedConsoleMember> {
972 let mut resolved = Vec::new();
973 let primary_handle = entry.runtime.handle();
974 let primary_mob_id = primary_handle.mob_id().to_string();
975 for member in primary_handle.list_members_including_retiring().await {
976 resolved.push(ResolvedConsoleMember {
977 entry: entry.clone(),
978 handle: primary_handle.clone(),
979 runtime_identity: member.agent_identity.to_string(),
980 member,
981 });
982 }
983
984 let Some(state) = entry.runtime.agent_mob_mcp_state() else {
985 return resolved;
986 };
987 if !entry.visibility_policy.include_implicit_delegate_members() {
988 return resolved;
989 }
990 for (mob_id, _state) in state.mob_list().await {
991 if mob_id.as_str() == primary_mob_id {
992 continue;
993 }
994 let Ok(handle) = state.handle_for(&mob_id).await else {
995 continue;
996 };
997 for member in handle.list_members_including_retiring().await {
998 resolved.push(ResolvedConsoleMember {
999 entry: entry.clone(),
1000 handle: handle.clone(),
1001 runtime_identity: member.agent_identity.to_string(),
1002 member,
1003 });
1004 }
1005 }
1006 resolved
1007}
1008
1009async fn dispatch_message_to_resolved_member(
1010 resolved: &ResolvedConsoleMember,
1011 content: ContentInput,
1012 handling_mode: meerkat_core::types::HandlingMode,
1013) -> Result<String, String> {
1014 let mid = MeerkatId::from(resolved.runtime_identity.as_str());
1015 match send_message_on_mob_with_mode(
1016 &resolved.handle,
1017 &resolved.runtime_identity,
1018 content.clone(),
1019 handling_mode,
1020 )
1021 .await
1022 {
1023 Ok(session_id) => Ok(session_id),
1024 Err(err) if err.to_string().contains("not externally addressable") => {
1025 let member = resolved
1026 .handle
1027 .member(&mid)
1028 .await
1029 .map_err(|err| err.to_string())?;
1030 let _receipt = member
1031 .internal_turn(content)
1032 .await
1033 .map_err(|err| err.to_string())?;
1034 resolved
1035 .handle
1036 .resolve_bridge_session_id(&mid)
1037 .await
1038 .map(|sid| sid.to_string())
1039 .ok_or_else(|| "member has no bridge session after internal turn".to_string())
1040 }
1041 Err(err) => Err(err.to_string()),
1042 }
1043}
1044
1045fn spawn_console_send_dispatch(
1046 inner: Arc<AggregatorInner>,
1047 resolved: ResolvedConsoleMember,
1048 content: ContentInput,
1049 handling_mode: meerkat_core::types::HandlingMode,
1050 dispatching: SendState,
1051 user_frame: ConsoleFrame,
1052 interaction_id: String,
1053) {
1054 tokio::spawn(async move {
1055 match dispatch_message_to_resolved_member(&resolved, content, handling_mode).await {
1056 Ok(_) => {
1057 let _ = dispatching.apply(SendTransition::MarkDelivered);
1058 if let Err(err) = update_frame_status_and_emit(
1059 &inner,
1060 &user_frame.id,
1061 ConsoleFrameStatus::Delivered,
1062 )
1063 .await
1064 {
1065 tracing::warn!(
1066 frame_id = %user_frame.id,
1067 error = %err,
1068 "failed to update console send delivery status"
1069 );
1070 }
1071 }
1072 Err(err) => {
1073 let _ = dispatching.apply(SendTransition::MarkDeliveryFailed);
1074 if let Err(update_err) = update_frame_status_and_emit(
1075 &inner,
1076 &user_frame.id,
1077 ConsoleFrameStatus::DeliveryFailed,
1078 )
1079 .await
1080 {
1081 tracing::warn!(
1082 frame_id = %user_frame.id,
1083 error = %update_err,
1084 "failed to update console send failure status"
1085 );
1086 }
1087 let failure_frame = NewConsoleFrame {
1088 id: None,
1089 dedupe_key: format!("delivery-failed:{}", user_frame.id),
1090 timestamp_ms: current_time_ms(),
1091 runtime_key: user_frame.runtime_key,
1092 identity: user_frame.identity,
1093 conversation_id: user_frame.conversation_id,
1094 session_id: user_frame.session_id,
1095 kind: "message_delivery_failed".to_string(),
1096 status: ConsoleFrameStatus::DeliveryFailed,
1097 payload: json!({ "reason": err }),
1098 source: ConsoleFrameSource {
1099 kind: ConsoleFrameSourceKind::Synthetic,
1100 source_cursor: None,
1101 },
1102 source_event_id: None,
1103 interaction_id: Some(interaction_id),
1104 turn_id: None,
1105 run_id: None,
1106 parent_frame_id: Some(user_frame.id.clone()),
1107 caused_by_frame_id: Some(user_frame.id),
1108 };
1109 if let Err(append_err) = append_and_emit(&inner, failure_frame).await {
1110 tracing::warn!(
1111 error = %append_err,
1112 "failed to append console send failure frame"
1113 );
1114 }
1115 }
1116 }
1117 });
1118}
1119
1120#[derive(Debug)]
1121pub enum ConsoleSendError {
1122 UnknownIdentity(String),
1123 NotAddressable(String),
1124 Retired(String),
1125 InvalidContent(String),
1126 InvalidHandlingMode(String),
1127 InvalidRequest(String),
1128 IdempotencyConflict(String),
1129 State(&'static str),
1130 Dispatch(String),
1131 Log(ConsoleLogError),
1132}
1133
1134impl std::fmt::Display for ConsoleSendError {
1135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136 match self {
1137 Self::UnknownIdentity(identity) => write!(f, "unknown identity: {identity}"),
1138 Self::NotAddressable(identity) => write!(f, "not addressable: {identity}"),
1139 Self::Retired(identity) => write!(f, "identity retired: {identity}"),
1140 Self::InvalidContent(message) => write!(f, "invalid content: {message}"),
1141 Self::InvalidHandlingMode(mode) => write!(f, "invalid handling mode: {mode}"),
1142 Self::InvalidRequest(message) => write!(f, "invalid request: {message}"),
1143 Self::IdempotencyConflict(key) => write!(f, "idempotency key conflict: {key}"),
1144 Self::State(message) => write!(f, "console send state error: {message}"),
1145 Self::Dispatch(message) => write!(f, "dispatch failed: {message}"),
1146 Self::Log(err) => write!(f, "console log error: {err}"),
1147 }
1148 }
1149}
1150
1151impl std::error::Error for ConsoleSendError {}
1152
1153async fn backfill_session_history(
1154 inner: Arc<AggregatorInner>,
1155 runtime_key: String,
1156 force_refresh: bool,
1157) -> ConsoleLogResult<()> {
1158 if !inner.options.session_history_backfill_enabled {
1159 return Ok(());
1160 }
1161 let Some(entry) = inner
1162 .runtimes
1163 .read()
1164 .ok()
1165 .and_then(|entries| entries.get(&runtime_key).cloned())
1166 else {
1167 return Ok(());
1168 };
1169 let members = member_sources_for_entry(&entry).await;
1170 let mut targets = Vec::new();
1171 for resolved in members {
1172 let member = resolved.member;
1173 let Some(record) = identity_record_for_member(&entry, &resolved.handle, &member).await
1174 else {
1175 continue;
1176 };
1177 if !entry.visibility_policy.identity_visible(&record) {
1178 continue;
1179 }
1180 let Some(session_id) = record.session_id.clone() else {
1181 continue;
1182 };
1183 targets.push(SessionBackfillTarget {
1184 entry: entry.clone(),
1185 record,
1186 session_id,
1187 });
1188 }
1189 backfill_session_history_targets(inner, targets, force_refresh).await
1190}
1191
1192#[derive(Clone)]
1193struct SessionBackfillTarget {
1194 entry: RuntimeEntry,
1195 record: ConsoleIdentityRecord,
1196 session_id: String,
1197}
1198
1199async fn backfill_session_history_targets(
1200 inner: Arc<AggregatorInner>,
1201 targets: Vec<SessionBackfillTarget>,
1202 force_refresh: bool,
1203) -> ConsoleLogResult<()> {
1204 let mut tasks = tokio::task::JoinSet::new();
1205 for target in targets {
1206 tasks.spawn(backfill_one_session_history(
1207 inner.clone(),
1208 target,
1209 force_refresh,
1210 ));
1211 }
1212 let mut first_error = None;
1213 while let Some(result) = tasks.join_next().await {
1214 match result {
1215 Ok(Ok(())) => {}
1216 Ok(Err(err)) => {
1217 if first_error.is_none() {
1218 first_error = Some(err);
1219 }
1220 }
1221 Err(err) => {
1222 if first_error.is_none() {
1223 first_error = Some(Box::new(std::io::Error::other(format!(
1224 "session backfill task failed: {err}"
1225 ))) as ConsoleLogError);
1226 }
1227 }
1228 }
1229 }
1230 if let Some(err) = first_error {
1231 Err(err)
1232 } else {
1233 Ok(())
1234 }
1235}
1236
1237async fn backfill_one_session_history(
1238 inner: Arc<AggregatorInner>,
1239 target: SessionBackfillTarget,
1240 force_refresh: bool,
1241) -> ConsoleLogResult<()> {
1242 let _permit = inner
1243 .session_backfill_permits
1244 .clone()
1245 .acquire_owned()
1246 .await
1247 .map_err(|err| -> ConsoleLogError {
1248 Box::new(std::io::Error::other(format!(
1249 "session backfill limiter closed: {err}"
1250 )))
1251 })?;
1252 let SessionBackfillTarget {
1253 entry,
1254 record,
1255 session_id,
1256 } = target;
1257 let watermark_runtime_key =
1258 session_history_watermark_runtime_key(&entry.runtime_key, &session_id);
1259 let watermark = inner
1260 .store
1261 .source_watermark(
1262 &watermark_runtime_key,
1263 ConsoleFrameSourceKind::SessionHistory,
1264 )
1265 .await?;
1266 let now_ms = current_time_ms();
1267 let mut offset = watermark
1268 .as_deref()
1269 .and_then(|watermark| parse_session_history_watermark(watermark, &session_id))
1270 .unwrap_or(0);
1271 if !force_refresh
1272 && watermark.as_deref().is_some_and(|watermark| {
1273 session_history_watermark_is_fresh(watermark, &session_id, now_ms)
1274 })
1275 {
1276 return Ok(());
1277 }
1278 loop {
1279 let page = match entry
1280 .runtime
1281 .read_session_history(&session_id, offset, Some(SESSION_HISTORY_PAGE_LIMIT))
1282 .await
1283 {
1284 Ok(page) => page,
1285 Err(err) => {
1286 append_backfill_gap(
1287 &inner,
1288 &entry.runtime_key,
1289 &record.identity,
1290 err.to_string(),
1291 )
1292 .await?;
1293 break;
1294 }
1295 };
1296 let page_value = match serde_json::to_value(page) {
1297 Ok(value) => value,
1298 Err(err) => {
1299 append_backfill_gap(
1300 &inner,
1301 &entry.runtime_key,
1302 &record.identity,
1303 err.to_string(),
1304 )
1305 .await?;
1306 break;
1307 }
1308 };
1309 let base_offset = page_value
1310 .get("offset")
1311 .and_then(Value::as_u64)
1312 .unwrap_or(offset as u64) as usize;
1313 let Some(messages) = page_value.get("messages").and_then(Value::as_array) else {
1314 append_backfill_gap(
1315 &inner,
1316 &entry.runtime_key,
1317 &record.identity,
1318 "session history page missing messages".to_string(),
1319 )
1320 .await?;
1321 break;
1322 };
1323 if messages.is_empty() {
1324 if offset > 0 {
1325 record_session_history_watermark(
1326 &inner,
1327 &watermark_runtime_key,
1328 &session_id,
1329 offset,
1330 )
1331 .await?;
1332 }
1333 break;
1334 }
1335 for (idx, message) in messages.iter().enumerate() {
1336 let absolute_offset = base_offset + idx;
1337 let frames = frames_from_session_history_message(
1338 &entry.runtime_key,
1339 &record.identity,
1340 &session_id,
1341 absolute_offset,
1342 message.clone(),
1343 );
1344 for mut frame in frames {
1345 if history_frame_has_existing_counterpart(&inner, &frame).await? {
1346 continue;
1347 }
1348 if let Some(redacted) = entry.visibility_policy.redact_payload(&frame) {
1349 frame.payload = redacted;
1350 frame.status = ConsoleFrameStatus::Redacted;
1351 }
1352 append_and_emit(&inner, frame).await?;
1353 }
1354 }
1355 offset = base_offset + messages.len();
1356 record_session_history_watermark(&inner, &watermark_runtime_key, &session_id, offset)
1357 .await?;
1358 let has_more = page_value
1359 .get("has_more")
1360 .and_then(Value::as_bool)
1361 .unwrap_or(false);
1362 if !has_more || messages.len() < SESSION_HISTORY_PAGE_LIMIT {
1363 break;
1364 }
1365 }
1366 Ok(())
1367}
1368
1369async fn record_session_history_watermark(
1370 inner: &AggregatorInner,
1371 watermark_runtime_key: &str,
1372 session_id: &str,
1373 offset: usize,
1374) -> ConsoleLogResult<()> {
1375 inner
1376 .store
1377 .record_source_watermark(
1378 watermark_runtime_key,
1379 ConsoleFrameSourceKind::SessionHistory,
1380 &format_session_history_watermark(session_id, offset, current_time_ms()),
1381 )
1382 .await
1383}
1384
1385fn spawn_session_history_backfill(inner: Arc<AggregatorInner>, runtime_key: String) {
1386 if !inner.options.session_history_backfill_enabled {
1387 return;
1388 }
1389 tokio::spawn(async move {
1390 {
1391 let mut active = inner.active_session_backfills.lock().await;
1392 if !active.insert(runtime_key.clone()) {
1393 return;
1394 }
1395 }
1396 let result = backfill_session_history(inner.clone(), runtime_key.clone(), false).await;
1397 let mut active = inner.active_session_backfills.lock().await;
1398 active.remove(&runtime_key);
1399 drop(active);
1400 if let Err(err) = result {
1401 tracing::warn!(
1402 runtime_key = %runtime_key,
1403 error = %err,
1404 "console session-history backfill failed"
1405 );
1406 }
1407 });
1408}
1409
1410fn spawn_session_history_discovery_loop(inner: Arc<AggregatorInner>, runtime_key: String) {
1411 if !inner.options.session_history_backfill_enabled {
1412 return;
1413 }
1414 tokio::spawn(async move {
1415 let mut interval = tokio::time::interval(SESSION_HISTORY_DISCOVERY_INTERVAL);
1416 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1417 loop {
1418 interval.tick().await;
1419 let runtime_still_registered = inner
1420 .runtimes
1421 .read()
1422 .ok()
1423 .is_some_and(|entries| entries.contains_key(&runtime_key));
1424 if !runtime_still_registered {
1425 break;
1426 }
1427 spawn_session_history_backfill(inner.clone(), runtime_key.clone());
1428 }
1429 });
1430}
1431
1432fn spawn_session_history_backfill_target(
1433 inner: Arc<AggregatorInner>,
1434 target: SessionBackfillTarget,
1435 force_refresh: bool,
1436) {
1437 if !inner.options.session_history_backfill_enabled {
1438 return;
1439 }
1440 tokio::spawn(async move {
1441 let active_key = targeted_session_history_active_key(&target, force_refresh);
1442 let result =
1443 run_targeted_session_history_backfill(inner.clone(), target, force_refresh).await;
1444 if let Err(err) = result {
1445 tracing::warn!(
1446 active_key = %active_key,
1447 error = %err,
1448 "console targeted session-history backfill failed"
1449 );
1450 }
1451 });
1452}
1453
1454async fn run_targeted_session_history_backfill(
1455 inner: Arc<AggregatorInner>,
1456 target: SessionBackfillTarget,
1457 force_refresh: bool,
1458) -> ConsoleLogResult<()> {
1459 let active_key = targeted_session_history_active_key(&target, force_refresh);
1460 {
1461 let mut active = inner.active_session_backfills.lock().await;
1462 if !active.insert(active_key.clone()) {
1463 return Ok(());
1464 }
1465 }
1466 let result = backfill_one_session_history(inner.clone(), target, force_refresh).await;
1467 let mut active = inner.active_session_backfills.lock().await;
1468 active.remove(&active_key);
1469 result
1470}
1471
1472fn targeted_session_history_active_key(
1473 target: &SessionBackfillTarget,
1474 force_refresh: bool,
1475) -> String {
1476 let mode = if force_refresh { "force" } else { "refresh" };
1477 format!(
1478 "{}:session-history:{}:{mode}",
1479 target.entry.runtime_key, target.session_id
1480 )
1481}
1482
1483fn spawn_session_history_backfill_for_identity(
1484 inner: Arc<AggregatorInner>,
1485 identity: String,
1486 force_refresh: bool,
1487) {
1488 if !inner.options.session_history_backfill_enabled {
1489 return;
1490 }
1491 tokio::spawn(async move {
1492 let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1493 return;
1494 };
1495 spawn_session_history_backfill_target(inner, target, force_refresh);
1496 });
1497}
1498
1499async fn backfill_identity_for_explicit_query(inner: Arc<AggregatorInner>, identity: String) {
1500 if !inner.options.session_history_backfill_enabled {
1501 return;
1502 }
1503 let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1504 return;
1505 };
1506 let (tx, rx) = oneshot::channel();
1507 tokio::spawn(async move {
1508 let result = run_targeted_session_history_backfill(inner, target, true)
1509 .await
1510 .map_err(|err| err.to_string());
1511 let _ = tx.send(result);
1512 });
1513 match tokio::time::timeout(EXPLICIT_IDENTITY_BACKFILL_WAIT, rx).await {
1514 Ok(Ok(Ok(())) | Err(_)) | Err(_) => {}
1515 Ok(Ok(Err(err))) => {
1516 tracing::warn!(
1517 identity = %identity,
1518 error = %err,
1519 "console explicit identity session-history refresh failed"
1520 );
1521 }
1522 }
1523}
1524
1525fn spawn_opportunistic_session_history_backfill_for_identity(
1526 inner: Arc<AggregatorInner>,
1527 identity: String,
1528) {
1529 if !inner.options.session_history_backfill_enabled {
1530 return;
1531 }
1532 tokio::spawn(async move {
1533 let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1534 return;
1535 };
1536 let active_key = format!(
1537 "{}:session-history:{}",
1538 target.entry.runtime_key, target.session_id
1539 );
1540 {
1541 let mut seen = inner.opportunistic_session_backfills.lock().await;
1542 if !seen.insert(active_key) {
1543 return;
1544 }
1545 }
1546 spawn_session_history_backfill_target(inner, target, false);
1547 });
1548}
1549
1550async fn session_backfill_target_for_identity(
1551 inner: &AggregatorInner,
1552 identity: &str,
1553) -> Option<SessionBackfillTarget> {
1554 let entries = inner
1555 .runtimes
1556 .read()
1557 .ok()
1558 .map(|entries| entries.clone())
1559 .unwrap_or_default();
1560 for entry in entries.values() {
1561 let Some(raw_identity) = strip_namespace(identity, &entry.identity_namespace) else {
1562 continue;
1563 };
1564 let mid = MeerkatId::from(raw_identity.as_str());
1565 for resolved in member_sources_for_entry(entry)
1566 .await
1567 .into_iter()
1568 .filter(|candidate| candidate.member.agent_identity == mid)
1569 {
1570 let Some(record) =
1571 identity_record_for_member(entry, &resolved.handle, &resolved.member).await
1572 else {
1573 continue;
1574 };
1575 if !entry.visibility_policy.identity_visible(&record) {
1576 continue;
1577 }
1578 let Some(session_id) = record.session_id.clone() else {
1579 continue;
1580 };
1581 return Some(SessionBackfillTarget {
1582 entry: entry.clone(),
1583 record,
1584 session_id,
1585 });
1586 }
1587 }
1588 None
1589}
1590
1591async fn recover_lagged_source_events(
1592 inner: Arc<AggregatorInner>,
1593 runtime_key: &str,
1594 console_events: &ConsoleEventStore,
1595) -> ConsoleLogResult<()> {
1596 let watermark = inner
1597 .store
1598 .source_watermark(runtime_key, ConsoleFrameSourceKind::ConsoleEvent)
1599 .await?;
1600 match console_events.replay_all(watermark.as_deref()).await {
1601 Ok(events) => {
1602 for envelope in events {
1603 project_console_event(inner.clone(), runtime_key, envelope).await?;
1604 }
1605 }
1606 Err(err) => {
1607 append_source_gap(
1608 &inner,
1609 runtime_key,
1610 format!(
1611 "{}:{}:{}",
1612 err.error, err.stream, err.requested_last_event_id
1613 ),
1614 )
1615 .await?;
1616 }
1617 }
1618 Ok(())
1619}
1620
1621async fn append_source_gap(
1622 inner: &AggregatorInner,
1623 runtime_key: &str,
1624 reason: String,
1625) -> ConsoleLogResult<()> {
1626 append_and_emit(
1627 inner,
1628 NewConsoleFrame {
1629 id: None,
1630 dedupe_key: format!("source-gap:{runtime_key}:{}", current_time_ms()),
1631 timestamp_ms: current_time_ms(),
1632 runtime_key: runtime_key.to_string(),
1633 identity: "__console__".to_string(),
1634 conversation_id: None,
1635 session_id: None,
1636 kind: "replay_unavailable".to_string(),
1637 status: ConsoleFrameStatus::DeliveryFailed,
1638 payload: json!({
1639 "reason": reason,
1640 "source_kind": "console_event",
1641 }),
1642 source: ConsoleFrameSource {
1643 kind: ConsoleFrameSourceKind::Synthetic,
1644 source_cursor: None,
1645 },
1646 source_event_id: None,
1647 interaction_id: None,
1648 turn_id: None,
1649 run_id: None,
1650 parent_frame_id: None,
1651 caused_by_frame_id: None,
1652 },
1653 )
1654 .await?;
1655 let _ = inner
1656 .event_tx
1657 .send(ConsoleTimelineEvent::ReplayUnavailable {
1658 requested_cursor: format!("source-gap:{runtime_key}"),
1659 latest_cursor: inner.store.latest_cursor().await.ok().flatten(),
1660 });
1661 Ok(())
1662}
1663
1664async fn append_backfill_gap(
1665 inner: &AggregatorInner,
1666 runtime_key: &str,
1667 identity: &str,
1668 reason: String,
1669) -> ConsoleLogResult<()> {
1670 append_and_emit(
1671 inner,
1672 NewConsoleFrame {
1673 id: None,
1674 dedupe_key: format!(
1675 "session-backfill-gap:{runtime_key}:{identity}:{}",
1676 current_time_ms()
1677 ),
1678 timestamp_ms: current_time_ms(),
1679 runtime_key: runtime_key.to_string(),
1680 identity: identity.to_string(),
1681 conversation_id: Some(identity.to_string()),
1682 session_id: None,
1683 kind: "replay_unavailable".to_string(),
1684 status: ConsoleFrameStatus::DeliveryFailed,
1685 payload: json!({
1686 "reason": reason,
1687 "source_kind": "session_history",
1688 }),
1689 source: ConsoleFrameSource {
1690 kind: ConsoleFrameSourceKind::Synthetic,
1691 source_cursor: None,
1692 },
1693 source_event_id: None,
1694 interaction_id: None,
1695 turn_id: None,
1696 run_id: None,
1697 parent_frame_id: None,
1698 caused_by_frame_id: None,
1699 },
1700 )
1701 .await?;
1702 Ok(())
1703}
1704
1705async fn project_console_event(
1706 inner: Arc<AggregatorInner>,
1707 runtime_key: &str,
1708 envelope: crate::console_contracts::ConsoleIdentityEventEnvelope,
1709) -> ConsoleLogResult<()> {
1710 let Some(entry) = inner
1711 .runtimes
1712 .read()
1713 .ok()
1714 .and_then(|entries| entries.get(runtime_key).cloned())
1715 else {
1716 return Ok(());
1717 };
1718 let mut frame = frame_from_console_event(&entry, envelope);
1719 if let Some(redacted) = entry.visibility_policy.redact_payload(&frame) {
1720 frame.payload = redacted;
1721 frame.status = ConsoleFrameStatus::Redacted;
1722 }
1723 let source_cursor = frame
1724 .source_event_id
1725 .clone()
1726 .unwrap_or_else(|| frame.dedupe_key.clone());
1727 let refresh_identity = if console_event_should_refresh_session_history(&frame) {
1728 Some(frame.identity.clone())
1729 } else {
1730 None
1731 };
1732 let opportunistic_refresh_identity = if refresh_identity.is_none()
1733 && console_event_should_start_session_history_backfill(&frame)
1734 {
1735 Some(frame.identity.clone())
1736 } else {
1737 None
1738 };
1739 append_and_emit(&inner, frame).await?;
1740 inner
1741 .store
1742 .record_source_watermark(
1743 &entry.runtime_key,
1744 ConsoleFrameSourceKind::ConsoleEvent,
1745 &source_cursor,
1746 )
1747 .await?;
1748 if let Some(identity) = refresh_identity {
1749 spawn_session_history_backfill_for_identity(inner.clone(), identity, true);
1750 } else if let Some(identity) = opportunistic_refresh_identity {
1751 spawn_opportunistic_session_history_backfill_for_identity(inner.clone(), identity);
1752 }
1753 Ok(())
1754}
1755
1756fn console_event_should_refresh_session_history(frame: &NewConsoleFrame) -> bool {
1757 matches!(
1758 frame.kind.as_str(),
1759 "interaction_complete" | "interaction_failed" | "message_delivery_failed"
1760 ) || frame.session_id.is_some()
1761}
1762
1763fn console_event_should_start_session_history_backfill(frame: &NewConsoleFrame) -> bool {
1764 if frame.identity == SYSTEM_EVENT_IDENTITY {
1765 return false;
1766 }
1767 matches!(
1768 frame.kind.as_str(),
1769 "turn_started"
1770 | "run_started"
1771 | "reasoning_complete"
1772 | "tool_call_requested"
1773 | "tool_call"
1774 | "tool_execution_started"
1775 | "text_delta"
1776 | "system_notice"
1777 )
1778}
1779
1780async fn append_and_emit(
1781 inner: &AggregatorInner,
1782 frame: NewConsoleFrame,
1783) -> ConsoleLogResult<AppendOutcome> {
1784 let outcome = inner.store.append_if_absent(frame).await?;
1785 if outcome.disposition == AppendDisposition::Inserted {
1786 let _ = inner.event_tx.send(ConsoleTimelineEvent::ConsoleFrame {
1787 frame: outcome.frame.clone(),
1788 });
1789 }
1790 Ok(outcome)
1791}
1792
1793async fn update_frame_status_and_emit(
1794 inner: &AggregatorInner,
1795 frame_id: &str,
1796 status: ConsoleFrameStatus,
1797) -> ConsoleLogResult<Option<ConsoleFrame>> {
1798 let Some(updated) = inner.store.update_frame_status(frame_id, status).await? else {
1799 return Ok(None);
1800 };
1801 let update_marker = NewConsoleFrame {
1802 id: None,
1803 dedupe_key: format!("frame-update:{}:{}", updated.id, updated.frame_version),
1804 timestamp_ms: updated.updated_at_ms.unwrap_or_else(current_time_ms),
1805 runtime_key: updated.runtime_key.clone(),
1806 identity: updated.identity.clone(),
1807 conversation_id: updated.conversation_id.clone(),
1808 session_id: updated.session_id.clone(),
1809 kind: "frame_updated".to_string(),
1810 status: updated.status,
1811 payload: json!({ "frame": updated.clone() }),
1812 source: ConsoleFrameSource {
1813 kind: ConsoleFrameSourceKind::Synthetic,
1814 source_cursor: None,
1815 },
1816 source_event_id: None,
1817 interaction_id: updated.interaction_id.clone(),
1818 turn_id: updated.turn_id.clone(),
1819 run_id: updated.run_id.clone(),
1820 parent_frame_id: Some(updated.id.clone()),
1821 caused_by_frame_id: Some(updated.id.clone()),
1822 };
1823 let outcome = inner.store.append_if_absent(update_marker).await?;
1824 if outcome.disposition == AppendDisposition::Inserted {
1825 let _ = inner.event_tx.send(ConsoleTimelineEvent::ConsoleFrame {
1826 frame: outcome.frame,
1827 });
1828 }
1829 Ok(Some(updated))
1830}
1831
1832fn frame_from_console_event(
1833 entry: &RuntimeEntry,
1834 envelope: crate::console_contracts::ConsoleIdentityEventEnvelope,
1835) -> NewConsoleFrame {
1836 let turn_id = envelope
1837 .data
1838 .get("turn_id")
1839 .and_then(Value::as_str)
1840 .map(ToString::to_string);
1841 let run_id = envelope
1842 .data
1843 .get("run_id")
1844 .and_then(Value::as_str)
1845 .map(ToString::to_string);
1846 let status = match envelope.event_type.as_str() {
1847 "interaction_started" => ConsoleFrameStatus::Accepted,
1848 "interaction_failed" | "run_failed" => ConsoleFrameStatus::DeliveryFailed,
1849 "interaction_complete" | "run_completed" => ConsoleFrameStatus::Completed,
1850 _ => ConsoleFrameStatus::Delivered,
1851 };
1852 let identity = apply_namespace(&envelope.identity, &entry.identity_namespace);
1853 NewConsoleFrame {
1854 id: Some(envelope.event_id.clone()),
1855 dedupe_key: format!("console-event:{}:{}", entry.runtime_key, envelope.event_id),
1856 timestamp_ms: envelope.timestamp_ms,
1857 runtime_key: entry.runtime_key.clone(),
1858 identity: identity.clone(),
1859 conversation_id: Some(identity),
1860 session_id: envelope
1861 .data
1862 .get("session_id")
1863 .and_then(Value::as_str)
1864 .map(ToString::to_string),
1865 kind: envelope.event_type,
1866 status,
1867 payload: envelope.data,
1868 source: ConsoleFrameSource {
1869 kind: ConsoleFrameSourceKind::ConsoleEvent,
1870 source_cursor: None,
1871 },
1872 source_event_id: Some(envelope.event_id),
1873 interaction_id: envelope.interaction_id,
1874 turn_id,
1875 run_id,
1876 parent_frame_id: None,
1877 caused_by_frame_id: None,
1878 }
1879}
1880
1881#[cfg(test)]
1882fn frame_from_session_history_message(
1883 runtime_key: &str,
1884 identity: &str,
1885 session_id: &str,
1886 offset: usize,
1887 message: Value,
1888) -> Option<NewConsoleFrame> {
1889 frames_from_session_history_message(runtime_key, identity, session_id, offset, message)
1890 .into_iter()
1891 .next()
1892}
1893
1894fn frames_from_session_history_message(
1895 runtime_key: &str,
1896 identity: &str,
1897 session_id: &str,
1898 offset: usize,
1899 message: Value,
1900) -> Vec<NewConsoleFrame> {
1901 let payload_hash = hash_short(&serde_json::to_string(&message).unwrap_or_default());
1902 let Some(parsed) = serde_json::from_value::<Message>(message.clone()).ok() else {
1903 return Vec::new();
1904 };
1905 if let Message::ToolResults {
1906 results,
1907 created_at,
1908 } = parsed
1909 {
1910 return results
1911 .into_iter()
1912 .enumerate()
1913 .map(|(idx, result)| {
1914 let content = serde_json::to_value(&result.content).unwrap_or(Value::Null);
1915 let result_text = result.text_content();
1916 let tool_use_id = result.tool_use_id.clone();
1917 NewConsoleFrame {
1918 id: None,
1919 dedupe_key: format!(
1920 "session-history:{runtime_key}:{session_id}:{offset}:{idx}:{payload_hash}"
1921 ),
1922 timestamp_ms: created_at.timestamp_millis().max(0) as u64,
1923 runtime_key: runtime_key.to_string(),
1924 identity: identity.to_string(),
1925 conversation_id: Some(identity.to_string()),
1926 session_id: Some(session_id.to_string()),
1927 kind: "tool_execution_completed".to_string(),
1928 status: ConsoleFrameStatus::Completed,
1929 payload: json!({
1930 "id": tool_use_id,
1931 "tool_call_id": tool_use_id,
1932 "result": result_text,
1933 "content": content,
1934 "is_error": result.is_error,
1935 "source_event_type": "session_history",
1936 "type": "session_history",
1937 }),
1938 source: ConsoleFrameSource {
1939 kind: ConsoleFrameSourceKind::SessionHistory,
1940 source_cursor: Some(format!("{session_id}:{offset}:{idx}")),
1941 },
1942 source_event_id: None,
1943 interaction_id: None,
1944 turn_id: None,
1945 run_id: None,
1946 parent_frame_id: None,
1947 caused_by_frame_id: None,
1948 }
1949 })
1950 .collect();
1951 }
1952 let (kind, timestamp_ms, payload) = match &parsed {
1953 Message::User(user) => {
1954 if session_history_user_message_is_scaffold(&message) {
1955 return Vec::new();
1956 }
1957 (
1958 "user_input",
1959 user.created_at.timestamp_millis().max(0) as u64,
1960 json!({
1961 "content": user.content,
1962 "message": message,
1963 }),
1964 )
1965 }
1966 Message::Assistant(assistant) => (
1967 "interaction_complete",
1968 assistant.created_at.timestamp_millis().max(0) as u64,
1969 json!({
1970 "result": assistant.content,
1971 "text": assistant.content,
1972 "message": message,
1973 "source_event_type": "session_history",
1974 "type": "session_history",
1975 }),
1976 ),
1977 Message::BlockAssistant(assistant) => {
1978 let text = assistant.text_blocks().collect::<Vec<_>>().join("");
1979 (
1980 "interaction_complete",
1981 assistant.created_at.timestamp_millis().max(0) as u64,
1982 json!({
1983 "result": text,
1984 "text": text,
1985 "message": message,
1986 "source_event_type": "session_history",
1987 "type": "session_history",
1988 }),
1989 )
1990 }
1991 Message::SystemNotice(notice) => (
1992 "system_notice",
1993 notice.created_at.timestamp_millis().max(0) as u64,
1994 json!({
1995 "message": message,
1996 "kind": notice.kind,
1997 "render_class": notice.kind.render_class(),
1998 "body": notice.body,
1999 "blocks": notice.blocks,
2000 "source_event_type": "session_history",
2001 "type": "session_history",
2002 }),
2003 ),
2004 Message::System(_) | Message::ToolResults { .. } => return Vec::new(),
2005 };
2006 vec![NewConsoleFrame {
2007 id: None,
2008 dedupe_key: format!("session-history:{runtime_key}:{session_id}:{offset}:{payload_hash}"),
2009 timestamp_ms,
2010 runtime_key: runtime_key.to_string(),
2011 identity: identity.to_string(),
2012 conversation_id: Some(identity.to_string()),
2013 session_id: Some(session_id.to_string()),
2014 kind: kind.to_string(),
2015 status: ConsoleFrameStatus::Completed,
2016 payload,
2017 source: ConsoleFrameSource {
2018 kind: ConsoleFrameSourceKind::SessionHistory,
2019 source_cursor: Some(format!("{session_id}:{offset}")),
2020 },
2021 source_event_id: None,
2022 interaction_id: None,
2023 turn_id: None,
2024 run_id: None,
2025 parent_frame_id: None,
2026 caused_by_frame_id: None,
2027 }]
2028}
2029
2030fn session_history_user_message_is_scaffold(message: &Value) -> bool {
2031 message
2032 .get("content")
2033 .is_some_and(scaffold_message_content_is_noise)
2034}
2035
2036fn scaffold_message_content_is_noise(value: &Value) -> bool {
2037 match value {
2038 Value::String(text) => scaffold_message_text_is_noise(text),
2039 Value::Array(items) => items.iter().any(scaffold_message_content_is_noise),
2040 Value::Object(map) => ["text", "content", "message"]
2041 .iter()
2042 .filter_map(|key| map.get(*key))
2043 .any(scaffold_message_content_is_noise),
2044 _ => false,
2045 }
2046}
2047
2048fn scaffold_message_text_is_noise(text: &str) -> bool {
2049 let trimmed = text.trim_start();
2050 trimmed.starts_with("[PEER UPDATE]")
2051 || trimmed
2052 .to_ascii_lowercase()
2053 .starts_with("you have been spawned")
2054}
2055
2056fn parse_session_history_watermark(watermark: &str, session_id: &str) -> Option<usize> {
2057 let rest = watermark.strip_prefix(session_id)?.strip_prefix(':')?;
2058 rest.split(':').next()?.parse().ok()
2059}
2060
2061fn format_session_history_watermark(session_id: &str, offset: usize, checked_at_ms: u64) -> String {
2062 format!("{session_id}:{offset}:{checked_at_ms}")
2063}
2064
2065fn session_history_watermark_is_fresh(watermark: &str, session_id: &str, now_ms: u64) -> bool {
2066 let Some(checked_at_ms) = watermark
2067 .rsplit_once(':')
2068 .and_then(|(_, checked_at_ms)| checked_at_ms.parse::<u64>().ok())
2069 else {
2070 return false;
2071 };
2072 let offset = parse_session_history_watermark(watermark, session_id).unwrap_or(0);
2073 let ttl_ms = if offset > 0 {
2074 SESSION_HISTORY_GROWING_REFRESH_TTL_MS
2075 } else {
2076 SESSION_HISTORY_REFRESH_TTL_MS
2077 };
2078 now_ms.saturating_sub(checked_at_ms) < ttl_ms
2079}
2080
2081async fn history_frame_has_existing_counterpart(
2082 inner: &AggregatorInner,
2083 frame: &NewConsoleFrame,
2084) -> ConsoleLogResult<bool> {
2085 let fingerprint = transcript_fingerprint(&frame.kind, &frame.payload);
2086 let Some(fingerprint) = fingerprint else {
2087 return Ok(false);
2088 };
2089 let assistant_terminal = assistant_terminal_fingerprint(&frame.kind, &frame.payload).is_some();
2090 let mut delta_text_by_turn = BTreeMap::<String, String>::new();
2091 let mut after = None;
2092 loop {
2093 let page = inner
2094 .store
2095 .query_frames(ConsoleTimelineQuery {
2096 identity: Some(frame.identity.clone()),
2097 conversation_id: frame.conversation_id.clone(),
2098 after,
2099 limit: 1_000,
2100 })
2101 .await?;
2102 for existing in &page.frames {
2103 let same_session = existing.session_id == frame.session_id
2104 || existing.session_id.is_none()
2105 || frame.session_id.is_none();
2106 if existing.source.kind == ConsoleFrameSourceKind::SessionHistory || !same_session {
2107 continue;
2108 }
2109 if transcript_fingerprint(&existing.kind, &existing.payload).as_ref()
2110 == Some(&fingerprint)
2111 {
2112 return Ok(true);
2113 }
2114 if assistant_terminal
2115 && let Some(delta) = text_delta_payload_text(&existing.kind, &existing.payload)
2116 {
2117 let turn_key = existing
2118 .interaction_id
2119 .as_deref()
2120 .or(existing.turn_id.as_deref())
2121 .or(existing.run_id.as_deref())
2122 .unwrap_or("session");
2123 let aggregated = delta_text_by_turn.entry(turn_key.to_string()).or_default();
2124 aggregated.push_str(delta);
2125 if normalize_transcript_fingerprint_text(aggregated) == fingerprint {
2126 return Ok(true);
2127 }
2128 }
2129 }
2130 if page.frames.is_empty() || page.next_cursor.is_none() {
2131 return Ok(false);
2132 }
2133 after = page.next_cursor;
2134 }
2135}
2136
2137fn session_history_watermark_runtime_key(runtime_key: &str, session_id: &str) -> String {
2138 format!("{runtime_key}:session-history:{session_id}")
2139}
2140
2141fn transcript_fingerprint(kind: &str, payload: &Value) -> Option<String> {
2142 match kind {
2143 "user_input" | "interaction_started" => payload
2144 .get("content")
2145 .map(stable_value_fingerprint)
2146 .or_else(|| payload.get("message").map(stable_value_fingerprint)),
2147 "text_delta" => {
2148 text_delta_payload_text(kind, payload).map(normalize_transcript_fingerprint_text)
2149 }
2150 "text_complete" | "interaction_complete" | "run_completed" => {
2151 assistant_terminal_fingerprint(kind, payload)
2152 }
2153 _ => None,
2154 }
2155}
2156
2157fn assistant_terminal_fingerprint(kind: &str, payload: &Value) -> Option<String> {
2158 match kind {
2159 "text_complete" | "interaction_complete" | "run_completed" => payload
2160 .get("text")
2161 .or_else(|| payload.get("result"))
2162 .or_else(|| payload.get("content"))
2163 .map(stable_value_fingerprint),
2164 _ => None,
2165 }
2166}
2167
2168fn text_delta_payload_text<'a>(kind: &str, payload: &'a Value) -> Option<&'a str> {
2169 if kind != "text_delta" {
2170 return None;
2171 }
2172 payload
2173 .get("delta")
2174 .or_else(|| payload.get("text"))
2175 .or_else(|| payload.get("content"))
2176 .and_then(Value::as_str)
2177 .or_else(|| payload.as_str())
2178}
2179
2180fn stable_value_fingerprint(value: &Value) -> String {
2181 match value {
2182 Value::String(text) => normalize_transcript_fingerprint_text(text),
2183 other => serde_json::to_string(other).unwrap_or_default(),
2184 }
2185}
2186
2187fn normalize_transcript_fingerprint_text(text: &str) -> String {
2188 let trimmed = text.trim();
2189 trimmed
2190 .strip_prefix("[EVENT via rpc] ")
2191 .unwrap_or(trimmed)
2192 .trim()
2193 .to_string()
2194}
2195
2196#[derive(Debug, Clone, Copy)]
2197enum CachedIdentityVisibility {
2198 Visible,
2199 Hidden,
2200 Missing,
2201}
2202
2203async fn frame_is_visible(
2204 inner: &AggregatorInner,
2205 frame: &ConsoleFrame,
2206 allow_historical_identity: bool,
2207) -> ConsoleLogResult<bool> {
2208 let mut identity_visibility_cache = HashMap::new();
2209 frame_is_visible_cached(
2210 inner,
2211 frame,
2212 allow_historical_identity,
2213 &mut identity_visibility_cache,
2214 )
2215 .await
2216}
2217
2218async fn frame_is_visible_cached(
2219 inner: &AggregatorInner,
2220 frame: &ConsoleFrame,
2221 allow_historical_identity: bool,
2222 identity_visibility_cache: &mut HashMap<(String, String), CachedIdentityVisibility>,
2223) -> ConsoleLogResult<bool> {
2224 let entry = {
2225 let entries = inner
2226 .runtimes
2227 .read()
2228 .map_err(|_| runtime_registry_lock_error())?;
2229 if entries.is_empty() {
2230 return Ok(true);
2231 }
2232 let Some(entry) = entries.get(&frame.runtime_key) else {
2233 return Ok(false);
2234 };
2235 entry.clone()
2236 };
2237 if frame.identity != "__console__" {
2238 let cache_key = (frame.runtime_key.clone(), frame.identity.clone());
2239 let identity_visibility =
2240 if let Some(cached) = identity_visibility_cache.get(&cache_key).copied() {
2241 cached
2242 } else {
2243 let runtime_member_id = strip_namespace(&frame.identity, &entry.identity_namespace)
2244 .unwrap_or_else(|| frame.identity.clone());
2245 let runtime_member = MeerkatId::from(runtime_member_id.as_str());
2246 let visibility = match member_sources_for_entry(&entry)
2247 .await
2248 .into_iter()
2249 .find(|member| member.member.agent_identity == runtime_member)
2250 {
2251 Some(resolved) => {
2252 match identity_record_for_member(&entry, &resolved.handle, &resolved.member)
2253 .await
2254 {
2255 Some(record) if entry.visibility_policy.identity_visible(&record) => {
2256 CachedIdentityVisibility::Visible
2257 }
2258 Some(_) => CachedIdentityVisibility::Hidden,
2259 None => CachedIdentityVisibility::Hidden,
2260 }
2261 }
2262 None => CachedIdentityVisibility::Missing,
2263 };
2264 identity_visibility_cache.insert(cache_key, visibility);
2265 visibility
2266 };
2267 match identity_visibility {
2268 CachedIdentityVisibility::Visible => {}
2269 CachedIdentityVisibility::Hidden => return Ok(false),
2270 CachedIdentityVisibility::Missing => {
2271 return Ok(
2272 allow_historical_identity && entry.visibility_policy.frame_visible(frame)
2273 );
2274 }
2275 }
2276 }
2277 Ok(entry.visibility_policy.frame_visible(frame))
2278}
2279
2280async fn identity_record_for_member(
2281 entry: &RuntimeEntry,
2282 handle: &MobHandle,
2283 member: &MobMemberListEntry,
2284) -> Option<ConsoleIdentityRecord> {
2285 let runtime_member_id = member.agent_identity.to_string();
2286 let durable_identity = member
2287 .labels
2288 .get("agent_identity")
2289 .filter(|value| !value.trim().is_empty())
2290 .map_or(runtime_member_id.as_str(), String::as_str);
2291 let identity = apply_namespace(durable_identity, &entry.identity_namespace);
2292 let addressable = member_is_addressable(member);
2293 let visibility = if member.state == meerkat_mob::MemberState::Retiring {
2294 ConsoleVisibility::RetiredReadable
2295 } else if addressable {
2296 ConsoleVisibility::Addressable
2297 } else {
2298 ConsoleVisibility::Hidden
2299 };
2300 let session_id = handle
2301 .resolve_bridge_session_id(&member.agent_identity)
2302 .await
2303 .map(|sid| sid.to_string());
2304 let display_name = member
2305 .labels
2306 .get("display_name")
2307 .cloned()
2308 .unwrap_or_else(|| runtime_member_id.clone());
2309 let mut labels = member.labels.clone();
2310 labels
2311 .entry("role".to_string())
2312 .or_insert_with(|| member.role.to_string());
2313 Some(ConsoleIdentityRecord {
2314 identity,
2315 display_name,
2316 runtime_key: entry.runtime_key.clone(),
2317 runtime_member_id,
2318 session_id,
2319 visibility,
2320 addressable,
2321 health: if addressable {
2322 "ready"
2323 } else {
2324 "hidden_by_policy"
2325 }
2326 .to_string(),
2327 labels,
2328 })
2329}
2330
2331pub(crate) fn is_implicit_delegate_member(
2332 role: &str,
2333 labels: &std::collections::BTreeMap<String, String>,
2334) -> bool {
2335 role.eq_ignore_ascii_case("delegate") && labels.contains_key("source_mob_id")
2336}
2337
2338fn member_is_addressable(member: &MobMemberListEntry) -> bool {
2339 member
2340 .labels
2341 .get("addressable")
2342 .map(|value| !value.eq_ignore_ascii_case("false"))
2343 .unwrap_or(true)
2344}
2345
2346fn apply_namespace(identity: &str, namespace: &str) -> String {
2347 let namespace = namespace.trim().trim_matches('/');
2348 if namespace.is_empty() {
2349 identity.to_string()
2350 } else {
2351 format!("{namespace}/{identity}")
2352 }
2353}
2354
2355fn strip_namespace(identity: &str, namespace: &str) -> Option<String> {
2356 let namespace = namespace.trim().trim_matches('/');
2357 if namespace.is_empty() {
2358 return Some(identity.to_string());
2359 }
2360 identity
2361 .strip_prefix(namespace)
2362 .and_then(|rest| rest.strip_prefix('/'))
2363 .map(ToString::to_string)
2364}
2365
2366fn validate_send_request(request: &ConsoleSendRequest) -> Result<(), ConsoleSendError> {
2367 if request.identity.trim().is_empty() {
2368 return Err(ConsoleSendError::InvalidRequest(
2369 "identity must be non-empty".to_string(),
2370 ));
2371 }
2372 if request.origin.trim().is_empty() {
2373 return Err(ConsoleSendError::InvalidRequest(
2374 "origin must be non-empty".to_string(),
2375 ));
2376 }
2377 if request.idempotency_key.trim().is_empty() {
2378 return Err(ConsoleSendError::InvalidRequest(
2379 "idempotency_key must be non-empty".to_string(),
2380 ));
2381 }
2382 Ok(())
2383}
2384
2385fn content_input_from_value(value: &Value) -> Result<ContentInput, ConsoleSendError> {
2386 let content: ContentInput = serde_json::from_value(value.clone())
2387 .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
2388 match &content {
2389 ContentInput::Text(text) if text.trim().is_empty() => Err(
2390 ConsoleSendError::InvalidContent("content must be non-empty".to_string()),
2391 ),
2392 ContentInput::Blocks(blocks) if blocks.is_empty() => Err(ConsoleSendError::InvalidContent(
2393 "content blocks must be non-empty".to_string(),
2394 )),
2395 _ => Ok(content),
2396 }
2397}
2398
2399fn parse_handling_mode(
2400 value: Option<&str>,
2401) -> Result<meerkat_core::types::HandlingMode, ConsoleSendError> {
2402 match value.unwrap_or("queue") {
2403 "queue" => Ok(meerkat_core::types::HandlingMode::Queue),
2404 "steer" => Ok(meerkat_core::types::HandlingMode::Steer),
2405 other => Err(ConsoleSendError::InvalidHandlingMode(other.to_string())),
2406 }
2407}
2408
2409fn accepted_from_frame(frame: &ConsoleFrame) -> ConsoleInteractionAccepted {
2410 ConsoleInteractionAccepted {
2411 interaction_id: frame
2412 .interaction_id
2413 .clone()
2414 .unwrap_or_else(|| format!("console-interaction-{}", hash_short(&frame.dedupe_key))),
2415 identity: frame.identity.clone(),
2416 conversation_id: frame.conversation_id.clone(),
2417 session_id: frame.session_id.clone(),
2418 input_frame_id: frame.id.clone(),
2419 cursor: frame.cursor.clone(),
2420 status: frame.status,
2421 }
2422}
2423
2424fn send_dedupe_key(
2425 runtime_key: &str,
2426 identity: &str,
2427 origin: &str,
2428 idempotency_key: &str,
2429) -> String {
2430 format!("send:{runtime_key}:{identity}:{origin}:{idempotency_key}")
2431}
2432
2433fn send_request_fingerprint(origin: &str, content: &Value, handling_mode: &str) -> String {
2434 let content_json = serde_json::to_string(content).unwrap_or_default();
2435 hash_short(&format!("{origin}\n{handling_mode}\n{content_json}"))
2436}
2437
2438fn hash_short(value: &str) -> String {
2439 let mut hasher = Sha256::new();
2440 hasher.update(value.as_bytes());
2441 let digest = hasher.finalize();
2442 to_hex(&digest[..8])
2443}
2444
2445fn to_hex(bytes: &[u8]) -> String {
2446 const HEX: &[u8; 16] = b"0123456789abcdef";
2447 let mut out = String::with_capacity(bytes.len() * 2);
2448 for byte in bytes {
2449 out.push(HEX[(byte >> 4) as usize] as char);
2450 out.push(HEX[(byte & 0x0f) as usize] as char);
2451 }
2452 out
2453}
2454
2455fn current_time_ms() -> u64 {
2456 match SystemTime::now().duration_since(UNIX_EPOCH) {
2457 Ok(duration) => duration.as_millis() as u64,
2458 Err(_) => 0,
2459 }
2460}
2461
2462fn runtime_registry_lock_error() -> ConsoleLogError {
2463 Box::new(std::io::Error::other(
2464 "console runtime registry lock poisoned",
2465 ))
2466}
2467
2468#[cfg(test)]
2469#[allow(clippy::expect_used)]
2470mod tests {
2471 use std::sync::Arc;
2472 use std::sync::atomic::{AtomicUsize, Ordering};
2473 use std::time::Duration;
2474 use std::time::Instant;
2475
2476 use futures::StreamExt;
2477 use meerkat::{AgentFactory, Config, build_ephemeral_service};
2478 use meerkat_client::types::LlmStream;
2479 use meerkat_client::{LlmClient, LlmDoneOutcome, LlmError, LlmEvent, LlmRequest, TestClient};
2480 use meerkat_core::{
2481 AppendSystemContextRequest, AppendSystemContextResult, CommsRuntime, EventStream,
2482 RunResult, SessionControlError, SessionError, SessionHistoryPage, SessionHistoryQuery,
2483 SessionId, SessionQuery, SessionService, SessionServiceCommsExt, SessionServiceControlExt,
2484 SessionServiceHistoryExt, SessionSummary, SessionView, StartTurnRequest, StopReason,
2485 StreamError,
2486 };
2487 use meerkat_mob::{MobDefinition, MobSessionService, MobStorage, SpawnMemberSpec};
2488 use serde_json::json;
2489
2490 use super::*;
2491 use crate::mob_handle_runtime::MobBootstrapSpec;
2492
2493 struct CountingConsoleLogStore {
2494 inner: InMemoryConsoleLogStore,
2495 source_watermark_calls: AtomicUsize,
2496 record_watermark_calls: AtomicUsize,
2497 }
2498
2499 impl CountingConsoleLogStore {
2500 fn new() -> Self {
2501 Self {
2502 inner: InMemoryConsoleLogStore::new(),
2503 source_watermark_calls: AtomicUsize::new(0),
2504 record_watermark_calls: AtomicUsize::new(0),
2505 }
2506 }
2507
2508 fn source_watermark_calls(&self) -> usize {
2509 self.source_watermark_calls.load(Ordering::SeqCst)
2510 }
2511 }
2512
2513 struct SlowTestClient {
2514 delay: Duration,
2515 }
2516
2517 #[async_trait::async_trait]
2518 impl LlmClient for SlowTestClient {
2519 fn project_replay_messages(&self, messages: &[Message]) -> Result<Vec<Message>, LlmError> {
2520 Ok(messages.to_vec())
2521 }
2522
2523 fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
2524 let delay = self.delay;
2525 let delayed_text = futures::stream::once(async move {
2526 tokio::time::sleep(delay).await;
2527 Ok(LlmEvent::TextDelta {
2528 delta: "slow ok".to_string(),
2529 meta: None,
2530 })
2531 });
2532 let done = futures::stream::once(async {
2533 Ok(LlmEvent::Done {
2534 outcome: LlmDoneOutcome::Success {
2535 stop_reason: StopReason::EndTurn,
2536 },
2537 })
2538 });
2539 Box::pin(delayed_text.chain(done))
2540 }
2541
2542 fn provider(&self) -> &'static str {
2543 "slow-test"
2544 }
2545
2546 async fn health_check(&self) -> Result<(), LlmError> {
2547 Ok(())
2548 }
2549 }
2550
2551 #[derive(Clone)]
2552 struct DelayedHistorySessionService {
2553 inner: Arc<dyn MobSessionService>,
2554 delay: Duration,
2555 read_calls: Arc<AtomicUsize>,
2556 active_reads: Arc<AtomicUsize>,
2557 max_active_reads: Arc<AtomicUsize>,
2558 }
2559
2560 impl DelayedHistorySessionService {
2561 fn new(inner: Arc<dyn MobSessionService>, delay: Duration) -> Self {
2562 Self {
2563 inner,
2564 delay,
2565 read_calls: Arc::new(AtomicUsize::new(0)),
2566 active_reads: Arc::new(AtomicUsize::new(0)),
2567 max_active_reads: Arc::new(AtomicUsize::new(0)),
2568 }
2569 }
2570
2571 fn read_calls(&self) -> usize {
2572 self.read_calls.load(Ordering::SeqCst)
2573 }
2574
2575 fn max_active_reads(&self) -> usize {
2576 self.max_active_reads.load(Ordering::SeqCst)
2577 }
2578 }
2579
2580 #[async_trait::async_trait]
2581 impl SessionService for DelayedHistorySessionService {
2582 async fn create_session(
2583 &self,
2584 req: meerkat_core::CreateSessionRequest,
2585 ) -> Result<RunResult, SessionError> {
2586 self.inner.create_session(req).await
2587 }
2588
2589 async fn start_turn(
2590 &self,
2591 id: &SessionId,
2592 req: StartTurnRequest,
2593 ) -> Result<RunResult, SessionError> {
2594 self.inner.start_turn(id, req).await
2595 }
2596
2597 async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
2598 self.inner.interrupt(id).await
2599 }
2600
2601 async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
2602 self.inner.read(id).await
2603 }
2604
2605 async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
2606 self.inner.list(query).await
2607 }
2608
2609 async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
2610 self.inner.archive(id).await
2611 }
2612
2613 async fn subscribe_session_events(
2614 &self,
2615 id: &SessionId,
2616 ) -> Result<EventStream, StreamError> {
2617 SessionService::subscribe_session_events(self.inner.as_ref(), id).await
2618 }
2619 }
2620
2621 #[async_trait::async_trait]
2622 impl SessionServiceCommsExt for DelayedHistorySessionService {
2623 async fn comms_runtime(&self, session_id: &SessionId) -> Option<Arc<dyn CommsRuntime>> {
2624 self.inner.comms_runtime(session_id).await
2625 }
2626 }
2627
2628 #[async_trait::async_trait]
2629 impl SessionServiceControlExt for DelayedHistorySessionService {
2630 async fn append_system_context(
2631 &self,
2632 id: &SessionId,
2633 req: AppendSystemContextRequest,
2634 ) -> Result<AppendSystemContextResult, SessionControlError> {
2635 self.inner.append_system_context(id, req).await
2636 }
2637 }
2638
2639 #[async_trait::async_trait]
2640 impl SessionServiceHistoryExt for DelayedHistorySessionService {
2641 async fn read_history(
2642 &self,
2643 id: &SessionId,
2644 query: SessionHistoryQuery,
2645 ) -> Result<SessionHistoryPage, SessionError> {
2646 self.read_calls.fetch_add(1, Ordering::SeqCst);
2647 let active = self.active_reads.fetch_add(1, Ordering::SeqCst) + 1;
2648 self.max_active_reads.fetch_max(active, Ordering::SeqCst);
2649 tokio::time::sleep(self.delay).await;
2650 let result = self.inner.read_history(id, query).await;
2651 self.active_reads.fetch_sub(1, Ordering::SeqCst);
2652 result
2653 }
2654 }
2655
2656 #[async_trait::async_trait]
2657 impl MobSessionService for DelayedHistorySessionService {
2658 fn supports_persistent_sessions(&self) -> bool {
2659 self.inner.supports_persistent_sessions()
2660 }
2661
2662 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
2663 self.inner.runtime_adapter()
2664 }
2665
2666 async fn session_belongs_to_mob(
2667 &self,
2668 session_id: &SessionId,
2669 mob_id: &meerkat_mob::MobId,
2670 ) -> bool {
2671 self.inner.session_belongs_to_mob(session_id, mob_id).await
2672 }
2673
2674 async fn cancel_all_checkpointers(&self) {
2675 self.inner.cancel_all_checkpointers().await;
2676 }
2677
2678 async fn rearm_all_checkpointers(&self) {
2679 self.inner.rearm_all_checkpointers().await;
2680 }
2681 }
2682
2683 #[async_trait::async_trait]
2684 impl ConsoleLogStore for CountingConsoleLogStore {
2685 async fn append_if_absent(
2686 &self,
2687 frame: NewConsoleFrame,
2688 ) -> ConsoleLogResult<AppendOutcome> {
2689 self.inner.append_if_absent(frame).await
2690 }
2691
2692 async fn update_frame_status(
2693 &self,
2694 frame_id: &str,
2695 status: ConsoleFrameStatus,
2696 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
2697 self.inner.update_frame_status(frame_id, status).await
2698 }
2699
2700 async fn query_frames(
2701 &self,
2702 query: ConsoleTimelineQuery,
2703 ) -> ConsoleLogResult<ConsoleTimelinePage> {
2704 self.inner.query_frames(query).await
2705 }
2706
2707 async fn frame_by_dedupe_key(
2708 &self,
2709 dedupe_key: &str,
2710 ) -> ConsoleLogResult<Option<ConsoleFrame>> {
2711 self.inner.frame_by_dedupe_key(dedupe_key).await
2712 }
2713
2714 async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
2715 self.inner.latest_cursor().await
2716 }
2717
2718 async fn clear_frames(&self) -> ConsoleLogResult<()> {
2719 self.inner.clear_frames().await
2720 }
2721
2722 async fn record_source_watermark(
2723 &self,
2724 runtime_key: &str,
2725 source_kind: ConsoleFrameSourceKind,
2726 source_cursor: &str,
2727 ) -> ConsoleLogResult<()> {
2728 self.record_watermark_calls.fetch_add(1, Ordering::SeqCst);
2729 self.inner
2730 .record_source_watermark(runtime_key, source_kind, source_cursor)
2731 .await
2732 }
2733
2734 async fn source_watermark(
2735 &self,
2736 runtime_key: &str,
2737 source_kind: ConsoleFrameSourceKind,
2738 ) -> ConsoleLogResult<Option<String>> {
2739 self.source_watermark_calls.fetch_add(1, Ordering::SeqCst);
2740 self.inner.source_watermark(runtime_key, source_kind).await
2741 }
2742 }
2743
2744 async fn build_single_member_runtime() -> UnifiedRuntime {
2745 build_single_member_runtime_with_client(Arc::new(TestClient::default())).await
2746 }
2747
2748 async fn build_single_member_runtime_with_client(client: Arc<dyn LlmClient>) -> UnifiedRuntime {
2749 let definition = MobDefinition::from_toml(
2750 r#"
2751[mob]
2752id = "console-aggregator-perf-test"
2753
2754[profiles.worker]
2755model = "gpt-5.5"
2756external_addressable = true
2757
2758[profiles.worker.tools]
2759comms = true
2760"#,
2761 )
2762 .expect("definition parses");
2763 let runtime = UnifiedRuntime::builder()
2764 .definition(definition)
2765 .default_llm_client(client)
2766 .build()
2767 .await
2768 .expect("runtime builds");
2769 runtime
2770 .spawn(SpawnMemberSpec::from_wire(
2771 "worker".to_string(),
2772 "agent-a".to_string(),
2773 Some("You are agent-a.".into()),
2774 None,
2775 None,
2776 ))
2777 .await
2778 .expect("member spawns");
2779 runtime
2780 }
2781
2782 async fn build_empty_runtime(mob_id: &str) -> UnifiedRuntime {
2783 let definition = MobDefinition::from_toml(&format!(
2784 r#"
2785[mob]
2786id = "{mob_id}"
2787
2788[profiles.worker]
2789model = "gpt-5.5"
2790external_addressable = true
2791
2792[profiles.worker.tools]
2793comms = true
2794"#
2795 ))
2796 .expect("definition parses");
2797 UnifiedRuntime::builder()
2798 .definition(definition)
2799 .default_llm_client(Arc::new(TestClient::default()))
2800 .build()
2801 .await
2802 .expect("runtime builds")
2803 }
2804
2805 async fn build_stress_runtime(
2806 member_count: usize,
2807 history_delay: Duration,
2808 ) -> (
2809 tempfile::TempDir,
2810 Arc<UnifiedRuntime>,
2811 DelayedHistorySessionService,
2812 ) {
2813 let temp_dir = tempfile::tempdir().expect("temp dir");
2814 let session_path = temp_dir.path().join("sessions");
2815 std::fs::create_dir_all(&session_path).expect("session path");
2816 let factory = AgentFactory::new(&session_path).comms(true);
2817 let base_service = Arc::new(build_ephemeral_service(
2818 factory,
2819 Config::default(),
2820 member_count + 8,
2821 ));
2822 let delayed_service = DelayedHistorySessionService::new(base_service, history_delay);
2823 let session_service: Arc<dyn MobSessionService> = Arc::new(delayed_service.clone());
2824 let definition = MobDefinition::from_toml(
2825 r#"
2826[mob]
2827id = "console-aggregator-stress-test"
2828
2829[profiles.worker]
2830model = "gpt-5.5"
2831external_addressable = true
2832
2833[profiles.worker.tools]
2834comms = true
2835"#,
2836 )
2837 .expect("definition parses");
2838 let spec = MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
2839 .with_options(crate::mob_handle_runtime::MobBootstrapOptions {
2840 allow_ephemeral_sessions: true,
2841 notify_orchestrator_on_resume: true,
2842 default_llm_client: Some(Arc::new(TestClient::default())),
2843 });
2844 let runtime = Arc::new(
2845 UnifiedRuntime::bootstrap(
2846 spec,
2847 crate::types::MobKitConfig {
2848 modules: Vec::new(),
2849 discovery: crate::types::DiscoverySpec {
2850 namespace: "stress".to_string(),
2851 modules: Vec::new(),
2852 },
2853 pre_spawn: Vec::new(),
2854 },
2855 Duration::from_secs(5),
2856 )
2857 .await
2858 .expect("runtime boots"),
2859 );
2860 for idx in 0..member_count {
2861 runtime
2862 .spawn(SpawnMemberSpec::from_wire(
2863 "worker".to_string(),
2864 format!("agent-{idx}"),
2865 Some(format!("You are agent-{idx}.").into()),
2866 None,
2867 None,
2868 ))
2869 .await
2870 .expect("member spawns");
2871 }
2872 (temp_dir, runtime, delayed_service)
2873 }
2874
2875 fn runtime_entry_for_test(runtime_key: &str, runtime: &UnifiedRuntime) -> RuntimeEntry {
2876 RuntimeEntry {
2877 runtime_key: runtime_key.to_string(),
2878 identity_namespace: "test".to_string(),
2879 runtime: runtime.mob_runtime().clone(),
2880 console_events: runtime.console_events(),
2881 visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
2882 }
2883 }
2884
2885 fn identity_record_for_test(identity: &str) -> ConsoleIdentityRecord {
2886 ConsoleIdentityRecord {
2887 identity: identity.to_string(),
2888 display_name: identity.to_string(),
2889 runtime_key: "runtime-cache".to_string(),
2890 runtime_member_id: identity.to_string(),
2891 session_id: Some(format!("session-{identity}")),
2892 visibility: ConsoleVisibility::Addressable,
2893 addressable: true,
2894 health: "ready".to_string(),
2895 labels: BTreeMap::new(),
2896 }
2897 }
2898
2899 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2900 async fn identity_record_uses_durable_agent_identity_label_for_identity_first_members()
2901 -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2902 let runtime = build_empty_runtime("identity-label-test").await;
2903 let mut labels = BTreeMap::new();
2904 labels.insert(
2905 "agent_identity".to_string(),
2906 "channel:C0SMOKEOB3".to_string(),
2907 );
2908 labels.insert("display_name".to_string(), "C0SMOKEOB3".to_string());
2909 runtime
2910 .spawn(
2911 SpawnMemberSpec::from_wire(
2912 "worker".to_string(),
2913 "rt:channel:C0SMOKEOB3:0".to_string(),
2914 Some("You are C0SMOKEOB3.".into()),
2915 None,
2916 None,
2917 )
2918 .with_labels(labels),
2919 )
2920 .await
2921 .expect("member spawns");
2922
2923 let entry = RuntimeEntry {
2924 runtime_key: "runtime-a".to_string(),
2925 identity_namespace: String::new(),
2926 runtime: runtime.mob_runtime().clone(),
2927 console_events: runtime.console_events(),
2928 visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
2929 };
2930 let aggregator = MobKitConsoleAggregator::in_memory();
2931 aggregator
2932 .inner
2933 .runtimes
2934 .write()
2935 .expect("runtime registry")
2936 .insert("runtime-a".to_string(), entry);
2937
2938 let records = aggregator.list_identities_fresh().await?;
2939 let record = records
2940 .iter()
2941 .find(|record| record.identity == "channel:C0SMOKEOB3")
2942 .expect("durable identity is exposed");
2943 assert_eq!(record.runtime_member_id, "rt:channel:C0SMOKEOB3:0");
2944
2945 let inspection = aggregator
2946 .inspect_identity("channel:C0SMOKEOB3")
2947 .await?
2948 .expect("durable identity resolves back to runtime member");
2949 assert_eq!(inspection.identity.identity, "channel:C0SMOKEOB3");
2950 assert_eq!(
2951 inspection.identity.runtime_member_id,
2952 "rt:channel:C0SMOKEOB3:0"
2953 );
2954
2955 let _ = runtime.mob_handle().stop().await;
2956 Ok(())
2957 }
2958
2959 #[tokio::test]
2960 async fn list_identities_serves_hot_cache_while_identity_refresh_is_in_flight() {
2961 let aggregator = MobKitConsoleAggregator::in_memory();
2962 let record = identity_record_for_test("agent-cached");
2963 *aggregator.inner.identity_read_model.inner.write().await = vec![record.clone()];
2964 aggregator
2965 .inner
2966 .identity_read_model
2967 .primed
2968 .store(true, Ordering::Release);
2969 let _guard = aggregator
2970 .inner
2971 .identity_read_model
2972 .refresh_lock
2973 .clone()
2974 .lock_owned()
2975 .await;
2976
2977 let identities =
2978 tokio::time::timeout(Duration::from_millis(50), aggregator.list_identities())
2979 .await
2980 .expect("hot identity list should not wait for refresh lock")
2981 .expect("identity list succeeds");
2982
2983 assert_eq!(identities, vec![record]);
2984 }
2985
2986 #[tokio::test]
2987 async fn list_identities_waits_for_inflight_identity_refresh_on_cold_cache() {
2988 let aggregator = MobKitConsoleAggregator::in_memory();
2989 let guard = aggregator
2990 .inner
2991 .identity_read_model
2992 .refresh_lock
2993 .clone()
2994 .lock_owned()
2995 .await;
2996 let waiter = tokio::spawn({
2997 let aggregator = aggregator.clone();
2998 async move { aggregator.list_identities().await }
2999 });
3000
3001 tokio::time::sleep(Duration::from_millis(20)).await;
3002 assert!(
3003 !waiter.is_finished(),
3004 "cold identity list should wait for the in-flight refresh to finish"
3005 );
3006
3007 let record = identity_record_for_test("agent-primed");
3008 *aggregator.inner.identity_read_model.inner.write().await = vec![record.clone()];
3009 aggregator
3010 .inner
3011 .identity_read_model
3012 .primed
3013 .store(true, Ordering::Release);
3014 drop(guard);
3015
3016 let identities = tokio::time::timeout(Duration::from_secs(1), waiter)
3017 .await
3018 .expect("cold identity list waiter should resume")
3019 .expect("waiter joins")
3020 .expect("identity list succeeds");
3021 assert_eq!(identities, vec![record]);
3022 }
3023
3024 #[tokio::test]
3025 async fn query_timeline_reads_from_aggregate_store() {
3026 let aggregator = MobKitConsoleAggregator::in_memory();
3027 let frame = NewConsoleFrame {
3028 id: None,
3029 dedupe_key: "event-1".to_string(),
3030 timestamp_ms: 1,
3031 runtime_key: "runtime-a".to_string(),
3032 identity: "agent-a".to_string(),
3033 conversation_id: Some("agent-a".to_string()),
3034 session_id: None,
3035 kind: "text_delta".to_string(),
3036 status: ConsoleFrameStatus::Delivered,
3037 payload: json!({ "delta": "hello" }),
3038 source: ConsoleFrameSource {
3039 kind: ConsoleFrameSourceKind::ConsoleEvent,
3040 source_cursor: None,
3041 },
3042 source_event_id: Some("event-1".to_string()),
3043 interaction_id: None,
3044 turn_id: None,
3045 run_id: None,
3046 parent_frame_id: None,
3047 caused_by_frame_id: None,
3048 };
3049 aggregator
3050 .store()
3051 .append_if_absent(frame)
3052 .await
3053 .expect("append frame");
3054
3055 let page = aggregator
3056 .query_timeline(ConsoleTimelineQuery {
3057 identity: Some("agent-a".to_string()),
3058 limit: 10,
3059 ..ConsoleTimelineQuery::default()
3060 })
3061 .await
3062 .expect("query timeline");
3063 assert_eq!(page.frames.len(), 1);
3064 assert_eq!(page.frames[0].kind, "text_delta");
3065 }
3066
3067 #[tokio::test]
3068 async fn query_timeline_is_store_local_for_registered_runtimes() {
3069 let store = Arc::new(CountingConsoleLogStore::new());
3070 let aggregator = MobKitConsoleAggregator::new(store.clone());
3071 let runtime = build_single_member_runtime().await;
3072 aggregator
3073 .inner
3074 .runtimes
3075 .write()
3076 .expect("runtime registry")
3077 .insert(
3078 "runtime-a".to_string(),
3079 runtime_entry_for_test("runtime-a", &runtime),
3080 );
3081 store
3082 .append_if_absent(NewConsoleFrame {
3083 id: None,
3084 dedupe_key: "event-1".to_string(),
3085 timestamp_ms: 1,
3086 runtime_key: "runtime-a".to_string(),
3087 identity: "agent-a".to_string(),
3088 conversation_id: Some("agent-a".to_string()),
3089 session_id: None,
3090 kind: "text_delta".to_string(),
3091 status: ConsoleFrameStatus::Delivered,
3092 payload: json!({ "delta": "hello" }),
3093 source: ConsoleFrameSource {
3094 kind: ConsoleFrameSourceKind::ConsoleEvent,
3095 source_cursor: None,
3096 },
3097 source_event_id: Some("event-1".to_string()),
3098 interaction_id: None,
3099 turn_id: None,
3100 run_id: None,
3101 parent_frame_id: None,
3102 caused_by_frame_id: None,
3103 })
3104 .await
3105 .expect("append frame");
3106
3107 let page = tokio::time::timeout(
3108 Duration::from_millis(250),
3109 aggregator.query_timeline(ConsoleTimelineQuery {
3110 identity: Some("agent-a".to_string()),
3111 limit: 10,
3112 ..ConsoleTimelineQuery::default()
3113 }),
3114 )
3115 .await
3116 .expect("timeline query should not wait for session history")
3117 .expect("timeline query succeeds");
3118
3119 assert_eq!(page.frames.len(), 1);
3120 assert_eq!(
3121 store.source_watermark_calls(),
3122 0,
3123 "query_timeline must not synchronously touch session-history watermarks"
3124 );
3125 let _ = runtime.mob_handle().stop().await;
3126 }
3127
3128 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3129 async fn console_send_returns_after_acceptance_without_waiting_for_turn_completion() {
3130 let runtime = build_single_member_runtime_with_client(Arc::new(SlowTestClient {
3131 delay: Duration::from_secs(2),
3132 }))
3133 .await;
3134 let aggregator = MobKitConsoleAggregator::in_memory();
3135 aggregator
3136 .inner
3137 .runtimes
3138 .write()
3139 .expect("runtime registry")
3140 .insert(
3141 "runtime-a".to_string(),
3142 runtime_entry_for_test("runtime-a", &runtime),
3143 );
3144
3145 let start = Instant::now();
3146 let accepted = tokio::time::timeout(
3147 Duration::from_millis(300),
3148 aggregator.send(ConsoleSendRequest {
3149 identity: "test/agent-a".to_string(),
3150 content: json!("hello slow agent"),
3151 origin: "console:test".to_string(),
3152 idempotency_key: "nonblocking-send".to_string(),
3153 handling_mode: Some("queue".to_string()),
3154 }),
3155 )
3156 .await
3157 .expect("console send should return once the input is accepted")
3158 .expect("send succeeds");
3159
3160 assert_eq!(accepted.status, ConsoleFrameStatus::Accepted);
3161 assert!(
3162 start.elapsed() < Duration::from_secs(1),
3163 "console send should not wait for the delayed LLM turn"
3164 );
3165
3166 wait_for_session_history_text(
3167 &aggregator,
3168 "test/agent-a",
3169 "slow ok",
3170 Duration::from_secs(5),
3171 )
3172 .await
3173 .expect("background dispatch should still complete and project history");
3174 let _ = runtime.mob_handle().stop().await;
3175 }
3176
3177 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3178 async fn discovered_late_member_session_backfills_without_manual_refresh() -> Result<(), String>
3179 {
3180 let runtime = Arc::new(build_empty_runtime("console-aggregator-late-member-test").await);
3181 let aggregator = MobKitConsoleAggregator::in_memory();
3182 aggregator.register_runtime(ConsoleRuntimeRegistration {
3183 runtime_key: "runtime-late".to_string(),
3184 runtime: runtime.clone(),
3185 identity_namespace: "late".to_string(),
3186 visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
3187 });
3188
3189 runtime
3190 .spawn(SpawnMemberSpec::from_wire(
3191 "worker".to_string(),
3192 "agent-late".to_string(),
3193 Some("You are agent-late.".into()),
3194 None,
3195 None,
3196 ))
3197 .await
3198 .expect("late member spawns");
3199 let session_id = send_message_on_mob_with_mode(
3200 &runtime.mob_handle(),
3201 "agent-late",
3202 ContentInput::Text("hello after registration".to_string()),
3203 meerkat_core::types::HandlingMode::Queue,
3204 )
3205 .await
3206 .expect("direct member send succeeds");
3207 wait_for_identity_record(
3208 &aggregator,
3209 "late/agent-late",
3210 Some(session_id.as_str()),
3211 Duration::from_secs(5),
3212 )
3213 .await?;
3214
3215 wait_for_session_history_text(
3216 &aggregator,
3217 "late/agent-late",
3218 "You are agent-late.",
3219 Duration::from_secs(5),
3220 )
3221 .await?;
3222 let _ = runtime.mob_handle().stop().await;
3223 Ok(())
3224 }
3225
3226 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3227 async fn explicit_empty_identity_query_force_refreshes_past_fresh_watermark()
3228 -> Result<(), String> {
3229 let runtime = build_single_member_runtime().await;
3230 let entry = runtime_entry_for_test("runtime-a", &runtime);
3231 let resolved = member_sources_for_entry(&entry)
3232 .await
3233 .into_iter()
3234 .find(|candidate| candidate.member.agent_identity.as_str() == "agent-a")
3235 .expect("agent-a member exists");
3236 let record = identity_record_for_member(&entry, &resolved.handle, &resolved.member)
3237 .await
3238 .expect("identity record exists");
3239 let session_id = record.session_id.expect("agent-a has a session");
3240
3241 let aggregator = MobKitConsoleAggregator::in_memory();
3242 aggregator
3243 .inner
3244 .runtimes
3245 .write()
3246 .expect("runtime registry")
3247 .insert("runtime-a".to_string(), entry);
3248 let watermark_key = session_history_watermark_runtime_key("runtime-a", &session_id);
3249 aggregator
3250 .store()
3251 .record_source_watermark(
3252 &watermark_key,
3253 ConsoleFrameSourceKind::SessionHistory,
3254 &format_session_history_watermark(&session_id, 0, current_time_ms()),
3255 )
3256 .await
3257 .expect("record fresh empty watermark");
3258
3259 let page = tokio::time::timeout(
3260 Duration::from_secs(2),
3261 aggregator.query_timeline(ConsoleTimelineQuery {
3262 identity: Some("test/agent-a".to_string()),
3263 limit: 20,
3264 ..ConsoleTimelineQuery::default()
3265 }),
3266 )
3267 .await
3268 .expect("explicit identity query should not stall")
3269 .expect("query succeeds");
3270
3271 assert!(
3272 page.frames.iter().any(|frame| {
3273 frame.source.kind == ConsoleFrameSourceKind::SessionHistory
3274 && frame.kind == "user_input"
3275 && session_history_content_text(frame).as_deref() == Some("You are agent-a.")
3276 }),
3277 "explicit identity query should force-refresh stale/fresh empty watermarks; frames: {:#?}",
3278 page.frames
3279 );
3280 let _ = runtime.mob_handle().stop().await;
3281 Ok(())
3282 }
3283
3284 async fn wait_for_session_history_text(
3285 aggregator: &MobKitConsoleAggregator,
3286 identity: &str,
3287 expected: &str,
3288 timeout: Duration,
3289 ) -> Result<(), String> {
3290 let deadline = Instant::now() + timeout;
3291 let mut observed = Vec::new();
3292 while Instant::now() < deadline {
3293 let _ = aggregator.list_identities().await;
3294 let page = aggregator
3295 .query_timeline(ConsoleTimelineQuery {
3296 identity: Some(identity.to_string()),
3297 limit: 20,
3298 ..ConsoleTimelineQuery::default()
3299 })
3300 .await
3301 .expect("query timeline");
3302 observed = page.frames;
3303 if observed.iter().any(|frame| {
3304 frame.source.kind == ConsoleFrameSourceKind::SessionHistory
3305 && (frame.kind == "user_input" || frame.kind == "interaction_complete")
3306 && session_history_content_text(frame).as_deref() == Some(expected)
3307 }) {
3308 return Ok(());
3309 }
3310 tokio::time::sleep(Duration::from_millis(25)).await;
3311 }
3312
3313 Err(format!(
3314 "session history text {expected:?} was not backfilled; observed frames: {observed:#?}",
3315 ))
3316 }
3317
3318 async fn wait_for_identity_record(
3319 aggregator: &MobKitConsoleAggregator,
3320 identity: &str,
3321 session_id: Option<&str>,
3322 timeout: Duration,
3323 ) -> Result<(), String> {
3324 let deadline = Instant::now() + timeout;
3325 let mut observed = Vec::new();
3326 while Instant::now() < deadline {
3327 observed = aggregator
3328 .list_identities()
3329 .await
3330 .map_err(|err| err.to_string())?;
3331 if observed.iter().any(|record| {
3332 record.identity == identity && record.session_id.as_deref() == session_id
3333 }) {
3334 return Ok(());
3335 }
3336 tokio::time::sleep(Duration::from_millis(25)).await;
3337 }
3338
3339 Err(format!(
3340 "identity {identity:?} with session {session_id:?} was not projected; observed identities: {observed:#?}",
3341 ))
3342 }
3343
3344 fn session_history_content_text(frame: &ConsoleFrame) -> Option<String> {
3345 if let Some(text) = frame.payload.get("text").and_then(Value::as_str) {
3346 return Some(text.to_string());
3347 }
3348 if let Some(text) = frame.payload.get("result").and_then(Value::as_str) {
3349 return Some(text.to_string());
3350 }
3351 match frame.payload.get("content")? {
3352 Value::String(text) => Some(text.clone()),
3353 Value::Array(blocks) => Some(
3354 blocks
3355 .iter()
3356 .filter_map(|block| block.get("text").and_then(Value::as_str))
3357 .collect::<Vec<_>>()
3358 .join(""),
3359 ),
3360 _ => None,
3361 }
3362 }
3363
3364 #[tokio::test]
3365 async fn query_timeline_handles_large_store_without_backfill_calls() {
3366 let store = Arc::new(CountingConsoleLogStore::new());
3367 let aggregator = MobKitConsoleAggregator::new(store.clone());
3368 for idx in 0..5_000 {
3369 store
3370 .append_if_absent(NewConsoleFrame {
3371 id: None,
3372 dedupe_key: format!("event-{idx}"),
3373 timestamp_ms: idx,
3374 runtime_key: "runtime-a".to_string(),
3375 identity: "agent-a".to_string(),
3376 conversation_id: Some("agent-a".to_string()),
3377 session_id: Some("session-a".to_string()),
3378 kind: "text_delta".to_string(),
3379 status: ConsoleFrameStatus::Delivered,
3380 payload: json!({ "delta": idx }),
3381 source: ConsoleFrameSource {
3382 kind: ConsoleFrameSourceKind::ConsoleEvent,
3383 source_cursor: None,
3384 },
3385 source_event_id: Some(format!("event-{idx}")),
3386 interaction_id: None,
3387 turn_id: None,
3388 run_id: None,
3389 parent_frame_id: None,
3390 caused_by_frame_id: None,
3391 })
3392 .await
3393 .expect("append frame");
3394 }
3395
3396 let page = aggregator
3397 .query_timeline(ConsoleTimelineQuery {
3398 identity: Some("agent-a".to_string()),
3399 limit: 1_000,
3400 ..ConsoleTimelineQuery::default()
3401 })
3402 .await
3403 .expect("large query");
3404
3405 assert_eq!(page.frames.len(), 1_000);
3406 assert_eq!(store.source_watermark_calls(), 0);
3407 }
3408
3409 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3410 async fn explicit_identity_timeline_query_does_not_resolve_roster_per_frame() {
3411 let store = Arc::new(CountingConsoleLogStore::new());
3412 let aggregator = MobKitConsoleAggregator::new(store.clone());
3413 let (_temp, runtime, _delayed_service) =
3414 build_stress_runtime(64, Duration::from_millis(0)).await;
3415 aggregator
3416 .inner
3417 .runtimes
3418 .write()
3419 .expect("runtime registry")
3420 .insert(
3421 "runtime-a".to_string(),
3422 runtime_entry_for_test("runtime-a", runtime.as_ref()),
3423 );
3424 for idx in 0..1_000 {
3425 store
3426 .append_if_absent(NewConsoleFrame {
3427 id: None,
3428 dedupe_key: format!("event-{idx}"),
3429 timestamp_ms: idx,
3430 runtime_key: "runtime-a".to_string(),
3431 identity: "test/agent-0".to_string(),
3432 conversation_id: Some("test/agent-0".to_string()),
3433 session_id: Some("session-a".to_string()),
3434 kind: "text_delta".to_string(),
3435 status: ConsoleFrameStatus::Delivered,
3436 payload: json!({ "delta": idx }),
3437 source: ConsoleFrameSource {
3438 kind: ConsoleFrameSourceKind::ConsoleEvent,
3439 source_cursor: None,
3440 },
3441 source_event_id: Some(format!("event-{idx}")),
3442 interaction_id: None,
3443 turn_id: None,
3444 run_id: None,
3445 parent_frame_id: None,
3446 caused_by_frame_id: None,
3447 })
3448 .await
3449 .expect("append frame");
3450 }
3451
3452 let page = tokio::time::timeout(
3453 Duration::from_secs(2),
3454 aggregator.query_timeline(ConsoleTimelineQuery {
3455 identity: Some("test/agent-0".to_string()),
3456 limit: 1_000,
3457 ..ConsoleTimelineQuery::default()
3458 }),
3459 )
3460 .await
3461 .expect("identity timeline query should not rediscover the roster per frame")
3462 .expect("timeline query succeeds");
3463
3464 assert_eq!(page.frames.len(), 1_000);
3465 assert_eq!(store.source_watermark_calls(), 0);
3466 let _ = runtime.mob_handle().stop().await;
3467 }
3468
3469 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3470 async fn refresh_session_history_parallelizes_slow_member_backfills_at_scale() {
3471 const MEMBER_COUNT: usize = 32;
3472 let (_temp, runtime, delayed_service) =
3473 build_stress_runtime(MEMBER_COUNT, Duration::from_millis(40)).await;
3474 let aggregator = MobKitConsoleAggregator::in_memory();
3475 aggregator
3476 .inner
3477 .runtimes
3478 .write()
3479 .expect("runtime registry")
3480 .insert(
3481 "runtime-stress".to_string(),
3482 runtime_entry_for_test("runtime-stress", &runtime),
3483 );
3484
3485 let started = Instant::now();
3486 aggregator
3487 .refresh_session_history()
3488 .await
3489 .expect("stress refresh");
3490 let elapsed = started.elapsed();
3491
3492 assert!(
3493 delayed_service.read_calls() >= MEMBER_COUNT,
3494 "expected at least one history read per member, saw {}",
3495 delayed_service.read_calls()
3496 );
3497 assert!(
3498 delayed_service.max_active_reads() > 1,
3499 "session history backfill should fan out instead of reading members serially"
3500 );
3501 assert!(
3502 delayed_service.max_active_reads()
3503 <= ConsoleAggregatorOptions::default().max_concurrent_session_backfills,
3504 "session history backfill should respect the default concurrency limit"
3505 );
3506 assert!(
3507 elapsed < Duration::from_millis(600),
3508 "parallel backfill should be far below serial {}ms path, elapsed: {elapsed:?}",
3509 MEMBER_COUNT * 40
3510 );
3511 let _ = runtime.mob_handle().stop().await;
3512 }
3513
3514 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3515 async fn session_history_backfill_respects_configured_concurrency_limit() {
3516 const MEMBER_COUNT: usize = 16;
3517 let (_temp, runtime, delayed_service) =
3518 build_stress_runtime(MEMBER_COUNT, Duration::from_millis(30)).await;
3519 let aggregator =
3520 MobKitConsoleAggregator::in_memory_with_options(ConsoleAggregatorOptions {
3521 max_concurrent_session_backfills: 4,
3522 ..ConsoleAggregatorOptions::default()
3523 });
3524 aggregator
3525 .inner
3526 .runtimes
3527 .write()
3528 .expect("runtime registry")
3529 .insert(
3530 "runtime-stress".to_string(),
3531 runtime_entry_for_test("runtime-stress", &runtime),
3532 );
3533
3534 aggregator
3535 .refresh_session_history()
3536 .await
3537 .expect("stress refresh");
3538
3539 assert!(
3540 delayed_service.max_active_reads() <= 4,
3541 "configured concurrency limit should bound session history reads, saw {}",
3542 delayed_service.max_active_reads()
3543 );
3544 let _ = runtime.mob_handle().stop().await;
3545 }
3546
3547 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3548 async fn live_event_burst_reaches_store_while_slow_backfill_is_running() {
3549 const MEMBER_COUNT: usize = 24;
3550 const LIVE_EVENT_COUNT: usize = 2_048;
3551 let (_temp, runtime, _delayed_service) =
3552 build_stress_runtime(MEMBER_COUNT, Duration::from_millis(200)).await;
3553 let aggregator = MobKitConsoleAggregator::in_memory();
3554 aggregator.register_runtime(ConsoleRuntimeRegistration {
3555 runtime_key: "runtime-burst".to_string(),
3556 runtime: runtime.clone(),
3557 identity_namespace: "stress".to_string(),
3558 visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
3559 });
3560 let console_events = runtime.console_events();
3561 for idx in 0..LIVE_EVENT_COUNT {
3562 console_events
3563 .append(
3564 "agent-0",
3565 Some("burst-turn".to_string()),
3566 "text_delta",
3567 json!({ "delta": format!("frame-{idx}") }),
3568 )
3569 .await;
3570 }
3571
3572 let deadline = Instant::now() + Duration::from_secs(5);
3573 let mut observed = 0;
3574 while Instant::now() < deadline {
3575 observed = count_console_event_frames(&aggregator, "stress/agent-0").await;
3576 if observed >= LIVE_EVENT_COUNT {
3577 break;
3578 }
3579 tokio::time::sleep(Duration::from_millis(25)).await;
3580 }
3581
3582 assert_eq!(
3583 observed, LIVE_EVENT_COUNT,
3584 "live pump should not drop frames while slow background backfill is running"
3585 );
3586 let _ = runtime.mob_handle().stop().await;
3587 }
3588
3589 async fn count_console_event_frames(
3590 aggregator: &MobKitConsoleAggregator,
3591 identity: &str,
3592 ) -> usize {
3593 let mut after = None;
3594 let mut count = 0;
3595 loop {
3596 let page = aggregator
3597 .query_timeline(ConsoleTimelineQuery {
3598 identity: Some(identity.to_string()),
3599 after,
3600 limit: 1_000,
3601 ..ConsoleTimelineQuery::default()
3602 })
3603 .await
3604 .expect("query burst timeline");
3605 if page.frames.is_empty() {
3606 break;
3607 }
3608 count += page
3609 .frames
3610 .iter()
3611 .filter(|frame| frame.source.kind == ConsoleFrameSourceKind::ConsoleEvent)
3612 .count();
3613 after = page.next_cursor;
3614 if after.is_none() {
3615 break;
3616 }
3617 }
3618 count
3619 }
3620
3621 #[tokio::test]
3622 async fn status_updates_get_replayable_aggregate_cursors() {
3623 let aggregator = MobKitConsoleAggregator::in_memory();
3624 let frame = NewConsoleFrame {
3625 id: None,
3626 dedupe_key: "send-1".to_string(),
3627 timestamp_ms: 1,
3628 runtime_key: "runtime-a".to_string(),
3629 identity: "agent-a".to_string(),
3630 conversation_id: Some("agent-a".to_string()),
3631 session_id: Some("session-1".to_string()),
3632 kind: "user_input".to_string(),
3633 status: ConsoleFrameStatus::Accepted,
3634 payload: json!({ "content": "hello" }),
3635 source: ConsoleFrameSource {
3636 kind: ConsoleFrameSourceKind::Send,
3637 source_cursor: None,
3638 },
3639 source_event_id: None,
3640 interaction_id: Some("interaction-1".to_string()),
3641 turn_id: None,
3642 run_id: None,
3643 parent_frame_id: None,
3644 caused_by_frame_id: None,
3645 };
3646 let inserted = aggregator
3647 .store()
3648 .append_if_absent(frame)
3649 .await
3650 .expect("append frame");
3651
3652 update_frame_status_and_emit(
3653 &aggregator.inner,
3654 &inserted.frame.id,
3655 ConsoleFrameStatus::Delivered,
3656 )
3657 .await
3658 .expect("update status");
3659
3660 let page = aggregator
3661 .query_timeline(ConsoleTimelineQuery {
3662 identity: Some("agent-a".to_string()),
3663 after: Some(inserted.frame.cursor.clone()),
3664 limit: 10,
3665 ..ConsoleTimelineQuery::default()
3666 })
3667 .await
3668 .expect("query timeline");
3669 assert_eq!(page.frames.len(), 1);
3670 assert_eq!(page.frames[0].kind, "frame_updated");
3671 assert_eq!(page.frames[0].parent_frame_id, Some(inserted.frame.id));
3672 assert_eq!(
3673 page.frames[0]
3674 .payload
3675 .get("frame")
3676 .and_then(|frame| frame.get("status"))
3677 .and_then(Value::as_str),
3678 Some("delivered")
3679 );
3680 }
3681
3682 #[tokio::test]
3683 async fn history_counterpart_scan_is_not_capped_to_one_page() {
3684 let aggregator = MobKitConsoleAggregator::in_memory();
3685 for idx in 0..1_005 {
3686 aggregator
3687 .store()
3688 .append_if_absent(NewConsoleFrame {
3689 id: None,
3690 dedupe_key: format!("filler-{idx}"),
3691 timestamp_ms: idx,
3692 runtime_key: "runtime-a".to_string(),
3693 identity: "agent-a".to_string(),
3694 conversation_id: Some("agent-a".to_string()),
3695 session_id: Some("session-a".to_string()),
3696 kind: "text_delta".to_string(),
3697 status: ConsoleFrameStatus::Completed,
3698 payload: json!({ "delta": idx }),
3699 source: ConsoleFrameSource {
3700 kind: ConsoleFrameSourceKind::ConsoleEvent,
3701 source_cursor: None,
3702 },
3703 source_event_id: Some(format!("filler-{idx}")),
3704 interaction_id: None,
3705 turn_id: None,
3706 run_id: None,
3707 parent_frame_id: None,
3708 caused_by_frame_id: None,
3709 })
3710 .await
3711 .expect("append filler");
3712 }
3713 aggregator
3714 .store()
3715 .append_if_absent(NewConsoleFrame {
3716 id: None,
3717 dedupe_key: "live-user-input".to_string(),
3718 timestamp_ms: 2_000,
3719 runtime_key: "runtime-a".to_string(),
3720 identity: "agent-a".to_string(),
3721 conversation_id: Some("agent-a".to_string()),
3722 session_id: Some("session-a".to_string()),
3723 kind: "user_input".to_string(),
3724 status: ConsoleFrameStatus::Delivered,
3725 payload: json!({ "content": "already here" }),
3726 source: ConsoleFrameSource {
3727 kind: ConsoleFrameSourceKind::ConsoleEvent,
3728 source_cursor: None,
3729 },
3730 source_event_id: Some("live-user-input".to_string()),
3731 interaction_id: None,
3732 turn_id: None,
3733 run_id: None,
3734 parent_frame_id: None,
3735 caused_by_frame_id: None,
3736 })
3737 .await
3738 .expect("append live input");
3739
3740 let history = NewConsoleFrame {
3741 id: None,
3742 dedupe_key: "history-user-input".to_string(),
3743 timestamp_ms: 3_000,
3744 runtime_key: "runtime-a".to_string(),
3745 identity: "agent-a".to_string(),
3746 conversation_id: Some("agent-a".to_string()),
3747 session_id: Some("session-a".to_string()),
3748 kind: "user_input".to_string(),
3749 status: ConsoleFrameStatus::Completed,
3750 payload: json!({ "content": "already here" }),
3751 source: ConsoleFrameSource {
3752 kind: ConsoleFrameSourceKind::SessionHistory,
3753 source_cursor: Some("session-a:1006".to_string()),
3754 },
3755 source_event_id: None,
3756 interaction_id: None,
3757 turn_id: None,
3758 run_id: None,
3759 parent_frame_id: None,
3760 caused_by_frame_id: None,
3761 };
3762
3763 assert!(
3764 history_frame_has_existing_counterpart(&aggregator.inner, &history)
3765 .await
3766 .expect("counterpart scan")
3767 );
3768 }
3769
3770 #[tokio::test]
3771 async fn history_counterpart_scan_matches_rpc_wrapped_user_prompts() {
3772 let aggregator = MobKitConsoleAggregator::in_memory();
3773 aggregator
3774 .store()
3775 .append_if_absent(NewConsoleFrame {
3776 id: None,
3777 dedupe_key: "live-user-input".to_string(),
3778 timestamp_ms: 2_000,
3779 runtime_key: "runtime-a".to_string(),
3780 identity: "agent-a".to_string(),
3781 conversation_id: Some("agent-a".to_string()),
3782 session_id: Some("session-a".to_string()),
3783 kind: "user_input".to_string(),
3784 status: ConsoleFrameStatus::Delivered,
3785 payload: json!({ "content": "hello from operator" }),
3786 source: ConsoleFrameSource {
3787 kind: ConsoleFrameSourceKind::ConsoleEvent,
3788 source_cursor: None,
3789 },
3790 source_event_id: Some("live-user-input".to_string()),
3791 interaction_id: None,
3792 turn_id: None,
3793 run_id: None,
3794 parent_frame_id: None,
3795 caused_by_frame_id: None,
3796 })
3797 .await
3798 .expect("append live input");
3799
3800 let history = NewConsoleFrame {
3801 id: None,
3802 dedupe_key: "history-user-input".to_string(),
3803 timestamp_ms: 3_000,
3804 runtime_key: "runtime-a".to_string(),
3805 identity: "agent-a".to_string(),
3806 conversation_id: Some("agent-a".to_string()),
3807 session_id: Some("session-a".to_string()),
3808 kind: "user_input".to_string(),
3809 status: ConsoleFrameStatus::Completed,
3810 payload: json!({ "content": "[EVENT via rpc] hello from operator" }),
3811 source: ConsoleFrameSource {
3812 kind: ConsoleFrameSourceKind::SessionHistory,
3813 source_cursor: Some("session-a:2".to_string()),
3814 },
3815 source_event_id: None,
3816 interaction_id: None,
3817 turn_id: None,
3818 run_id: None,
3819 parent_frame_id: None,
3820 caused_by_frame_id: None,
3821 };
3822
3823 assert!(
3824 history_frame_has_existing_counterpart(&aggregator.inner, &history)
3825 .await
3826 .expect("counterpart scan")
3827 );
3828 }
3829
3830 #[tokio::test]
3831 async fn history_counterpart_scan_matches_streamed_text_delta_completion() {
3832 let aggregator = MobKitConsoleAggregator::in_memory();
3833 for (idx, delta) in ["Ready ", "and standing by."].iter().enumerate() {
3834 aggregator
3835 .store()
3836 .append_if_absent(NewConsoleFrame {
3837 id: None,
3838 dedupe_key: format!("live-delta-{idx}"),
3839 timestamp_ms: 2_000 + idx as u64,
3840 runtime_key: "runtime-a".to_string(),
3841 identity: "agent-a".to_string(),
3842 conversation_id: Some("agent-a".to_string()),
3843 session_id: Some("session-a".to_string()),
3844 kind: "text_delta".to_string(),
3845 status: ConsoleFrameStatus::Delivered,
3846 payload: json!({ "delta": delta }),
3847 source: ConsoleFrameSource {
3848 kind: ConsoleFrameSourceKind::ConsoleEvent,
3849 source_cursor: None,
3850 },
3851 source_event_id: Some(format!("live-delta-{idx}")),
3852 interaction_id: Some("turn-a".to_string()),
3853 turn_id: None,
3854 run_id: None,
3855 parent_frame_id: None,
3856 caused_by_frame_id: None,
3857 })
3858 .await
3859 .expect("append live delta");
3860 }
3861
3862 let history = NewConsoleFrame {
3863 id: None,
3864 dedupe_key: "history-assistant-complete".to_string(),
3865 timestamp_ms: 3_000,
3866 runtime_key: "runtime-a".to_string(),
3867 identity: "agent-a".to_string(),
3868 conversation_id: Some("agent-a".to_string()),
3869 session_id: Some("session-a".to_string()),
3870 kind: "interaction_complete".to_string(),
3871 status: ConsoleFrameStatus::Completed,
3872 payload: json!({ "result": "Ready and standing by." }),
3873 source: ConsoleFrameSource {
3874 kind: ConsoleFrameSourceKind::SessionHistory,
3875 source_cursor: Some("session-a:3".to_string()),
3876 },
3877 source_event_id: None,
3878 interaction_id: None,
3879 turn_id: None,
3880 run_id: None,
3881 parent_frame_id: None,
3882 caused_by_frame_id: None,
3883 };
3884
3885 assert!(
3886 history_frame_has_existing_counterpart(&aggregator.inner, &history)
3887 .await
3888 .expect("counterpart scan")
3889 );
3890 }
3891
3892 #[test]
3893 fn session_history_watermark_key_is_session_scoped() {
3894 assert_ne!(
3895 session_history_watermark_runtime_key("runtime-a", "session-1"),
3896 session_history_watermark_runtime_key("runtime-a", "session-2")
3897 );
3898 }
3899
3900 #[test]
3901 fn session_history_watermarks_are_cursor_and_ttl_aware() {
3902 let legacy = "session:with:colon:42";
3903 let checked = format_session_history_watermark("session:with:colon", 43, 1_000);
3904 let empty_checked = format_session_history_watermark("session:with:colon", 0, 1_000);
3905
3906 assert_eq!(
3907 parse_session_history_watermark(legacy, "session:with:colon"),
3908 Some(42)
3909 );
3910 assert_eq!(
3911 parse_session_history_watermark(&checked, "session:with:colon"),
3912 Some(43)
3913 );
3914 assert!(session_history_watermark_is_fresh(
3915 &checked,
3916 "session:with:colon",
3917 1_500
3918 ));
3919 assert!(!session_history_watermark_is_fresh(
3920 &checked,
3921 "session:with:colon",
3922 1_000 + SESSION_HISTORY_GROWING_REFRESH_TTL_MS + 1
3923 ));
3924 assert!(session_history_watermark_is_fresh(
3925 &empty_checked,
3926 "session:with:colon",
3927 1_500
3928 ));
3929 assert!(!session_history_watermark_is_fresh(
3930 &empty_checked,
3931 "session:with:colon",
3932 1_000 + SESSION_HISTORY_REFRESH_TTL_MS + 1
3933 ));
3934 }
3935
3936 #[test]
3937 fn session_history_messages_project_to_renderable_frames() {
3938 let user = frame_from_session_history_message(
3939 "runtime-a",
3940 "agent-a",
3941 "session-a",
3942 0,
3943 json!({
3944 "role": "user",
3945 "content": "hello",
3946 "timestamp_ms": 10
3947 }),
3948 )
3949 .expect("user history frame");
3950 let assistant = frame_from_session_history_message(
3951 "runtime-a",
3952 "agent-a",
3953 "session-a",
3954 1,
3955 json!({
3956 "role": "assistant",
3957 "content": "hi there",
3958 "stop_reason": "end_turn",
3959 "usage": { "input_tokens": 1, "output_tokens": 1, "total_tokens": 2 },
3960 "timestamp_ms": 11
3961 }),
3962 )
3963 .expect("assistant history frame");
3964
3965 assert_eq!(user.kind, "user_input");
3966 assert_eq!(user.source.kind, ConsoleFrameSourceKind::SessionHistory);
3967 assert_eq!(
3968 user.payload["content"],
3969 json!([{ "type": "text", "text": "hello" }])
3970 );
3971 assert_eq!(assistant.kind, "interaction_complete");
3972 assert_eq!(assistant.payload["text"], json!("hi there"));
3973 assert!(
3974 assistant
3975 .dedupe_key
3976 .starts_with("session-history:runtime-a:session-a:1:")
3977 );
3978 }
3979
3980 #[test]
3981 fn session_history_projection_filters_scaffold_user_messages() {
3982 let spawn_notice = frame_from_session_history_message(
3983 "runtime-a",
3984 "agent-a",
3985 "session-a",
3986 0,
3987 json!({
3988 "role": "user",
3989 "content": "You have been spawned as 'agent-a' (role: worker) in mob 'mob-a'.",
3990 "timestamp_ms": 10
3991 }),
3992 );
3993 let peer_update = frame_from_session_history_message(
3994 "runtime-a",
3995 "agent-a",
3996 "session-a",
3997 1,
3998 json!({
3999 "role": "user",
4000 "content": [{ "type": "text", "text": "[PEER UPDATE] alpha wired to beta" }],
4001 "timestamp_ms": 11
4002 }),
4003 );
4004 let real_user = frame_from_session_history_message(
4005 "runtime-a",
4006 "agent-a",
4007 "session-a",
4008 2,
4009 json!({
4010 "role": "user",
4011 "content": "Please review the incident notes.",
4012 "timestamp_ms": 12
4013 }),
4014 );
4015
4016 assert!(spawn_notice.is_none());
4017 assert!(peer_update.is_none());
4018 assert!(real_user.is_some());
4019 }
4020
4021 #[test]
4022 fn session_history_projection_skips_non_transcript_messages() {
4023 let skipped = frame_from_session_history_message(
4024 "runtime-a",
4025 "agent-a",
4026 "session-a",
4027 0,
4028 json!({
4029 "content": "internal system prompt"
4030 }),
4031 );
4032 assert!(skipped.is_none());
4033 }
4034
4035 #[test]
4036 fn session_history_projection_extracts_assistant_blocks() {
4037 let frame = frame_from_session_history_message(
4038 "runtime-a",
4039 "agent-a",
4040 "session-a",
4041 0,
4042 json!({
4043 "role": "block_assistant",
4044 "blocks": [
4045 { "block_type": "text", "data": { "text": "hello " } },
4046 { "block_type": "text", "data": { "text": "there" } }
4047 ],
4048 "stop_reason": "end_turn"
4049 }),
4050 )
4051 .expect("assistant block history frame");
4052 assert_eq!(frame.payload["text"], json!("hello there"));
4053 }
4054
4055 #[test]
4056 fn session_history_projection_extracts_nested_text_block_data() {
4057 let frame = frame_from_session_history_message(
4058 "runtime-a",
4059 "agent-a",
4060 "session-a",
4061 0,
4062 json!({
4063 "role": "block_assistant",
4064 "blocks": [
4065 {
4066 "block_type": "text",
4067 "data": { "text": "Ready and standing by." }
4068 }
4069 ],
4070 "stop_reason": "end_turn",
4071 "created_at": "1970-01-01T00:00:00.010Z"
4072 }),
4073 )
4074 .expect("assistant block history frame");
4075
4076 assert_eq!(frame.kind, "interaction_complete");
4077 assert_eq!(frame.payload["result"], json!("Ready and standing by."));
4078 }
4079
4080 #[test]
4081 fn session_history_projection_drops_reasoning_blocks_from_result_text() {
4082 let frame = frame_from_session_history_message(
4083 "runtime-a",
4084 "agent-a",
4085 "session-a",
4086 0,
4087 json!({
4088 "role": "block_assistant",
4089 "blocks": [
4090 {
4091 "block_type": "reasoning",
4092 "data": { "text": "**Planning**\nI should not be rendered." }
4093 },
4094 {
4095 "block_type": "text",
4096 "data": { "text": "Visible answer." }
4097 }
4098 ],
4099 "stop_reason": "end_turn",
4100 "created_at": "1970-01-01T00:00:00.010Z"
4101 }),
4102 )
4103 .expect("assistant block history frame");
4104
4105 assert_eq!(frame.kind, "interaction_complete");
4106 assert_eq!(frame.payload["result"], json!("Visible answer."));
4107 assert_eq!(frame.payload["text"], json!("Visible answer."));
4108 }
4109
4110 #[test]
4111 fn session_history_projection_leaves_reasoning_only_result_empty() {
4112 let frame = frame_from_session_history_message(
4113 "runtime-a",
4114 "agent-a",
4115 "session-a",
4116 0,
4117 json!({
4118 "role": "block_assistant",
4119 "blocks": [
4120 {
4121 "block_type": "reasoning",
4122 "data": { "text": "Private planning text." }
4123 },
4124 {
4125 "block_type": "tool_use",
4126 "data": { "id": "toolu-1", "name": "peers", "args": {} }
4127 }
4128 ],
4129 "stop_reason": "end_turn",
4130 "created_at": "1970-01-01T00:00:00.010Z"
4131 }),
4132 )
4133 .expect("assistant block history frame");
4134
4135 assert_eq!(frame.kind, "interaction_complete");
4136 assert_eq!(frame.payload["result"], json!(""));
4137 assert_eq!(frame.payload["text"], json!(""));
4138 }
4139
4140 #[test]
4141 fn session_history_projection_preserves_tool_results() {
4142 let frames = frames_from_session_history_message(
4143 "runtime-a",
4144 "agent-a",
4145 "session-a",
4146 5,
4147 json!({
4148 "role": "tool_results",
4149 "results": [
4150 {
4151 "tool_use_id": "call-peers",
4152 "content": "{\"peers\":[{\"peer_id\":\"peer-1\",\"name\":\"mob/worker/peer-1\"}]}",
4153 "is_error": false
4154 }
4155 ],
4156 "created_at": "1970-01-01T00:00:00.050Z"
4157 }),
4158 );
4159
4160 assert_eq!(frames.len(), 1);
4161 let frame = &frames[0];
4162 assert_eq!(frame.kind, "tool_execution_completed");
4163 assert_eq!(frame.payload["tool_call_id"], json!("call-peers"));
4164 assert_eq!(
4165 frame.payload["result"],
4166 json!("{\"peers\":[{\"peer_id\":\"peer-1\",\"name\":\"mob/worker/peer-1\"}]}")
4167 );
4168 assert_eq!(frame.source.kind, ConsoleFrameSourceKind::SessionHistory);
4169 assert_eq!(frame.timestamp_ms, 50);
4170 }
4171
4172 #[test]
4173 fn session_history_projection_uses_rfc3339_created_at_timestamp() {
4174 let frame = frame_from_session_history_message(
4175 "runtime-a",
4176 "agent-a",
4177 "session-a",
4178 0,
4179 json!({
4180 "role": "user",
4181 "content": "hello",
4182 "created_at": "2026-05-12T05:00:06.564227Z"
4183 }),
4184 )
4185 .expect("user history frame");
4186
4187 assert_eq!(frame.timestamp_ms, 1_778_562_006_564);
4188 }
4189}