Skip to main content

meerkat_mobkit/console_aggregator/
mod.rs

1mod state;
2mod store;
3mod types;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use futures::future::join_all;
11use meerkat_core::{ContentInput, Message};
12use meerkat_mob::MobHandle;
13use meerkat_mob::ids::MeerkatId;
14use meerkat_mob::runtime::MobMemberListEntry;
15use serde_json::{Value, json};
16use sha2::{Digest, Sha256};
17use tokio::sync::{Semaphore, broadcast, oneshot};
18
19use crate::blob_store::BinaryBlobStore;
20use crate::console_contracts::SYSTEM_EVENT_IDENTITY;
21use crate::mob_handle_runtime::{
22    MobRuntime, assert_member_accepts_images, send_message_on_mob_with_mode,
23};
24use crate::runtime::ConsoleMember;
25use crate::unified_runtime::{ConsoleEventStore, UnifiedRuntime};
26
27pub use state::{
28    ReplaySubscriptionEffect, ReplaySubscriptionState, ReplaySubscriptionTransition, SendEffect,
29    SendState, SendTransition, SourceIngestionEffect, SourceIngestionState,
30    SourceIngestionTransition,
31};
32pub use store::{
33    ConsoleLogError, ConsoleLogResult, ConsoleLogStore, InMemoryConsoleLogStore,
34    SqliteConsoleLogStore,
35};
36pub use types::{
37    AppendDisposition, AppendOutcome, ConsoleCursor, ConsoleFrame, ConsoleFrameSource,
38    ConsoleFrameSourceKind, ConsoleFrameStatus, ConsoleIdentityInspection, ConsoleIdentityRecord,
39    ConsoleInteractionAccepted, ConsoleReplayUnavailable, ConsoleSendRequest, ConsoleTimelineEvent,
40    ConsoleTimelinePage, ConsoleTimelineQuery, ConsoleVisibility, NewConsoleFrame,
41};
42
43const TIMELINE_CHANNEL_CAP: usize = 1024;
44const SESSION_HISTORY_PAGE_LIMIT: usize = 500;
45const SESSION_HISTORY_REFRESH_TTL_MS: u64 = 30_000;
46const SESSION_HISTORY_GROWING_REFRESH_TTL_MS: u64 = 2_000;
47const SESSION_HISTORY_DISCOVERY_INTERVAL: Duration = Duration::from_secs(5);
48const EXPLICIT_IDENTITY_BACKFILL_WAIT: Duration = Duration::from_millis(750);
49
50#[derive(Clone)]
51pub struct MobKitConsoleAggregator {
52    inner: Arc<AggregatorInner>,
53}
54
55#[derive(Debug, Clone, Copy)]
56pub struct ConsoleAggregatorOptions {
57    pub session_history_backfill_enabled: bool,
58    pub max_concurrent_session_backfills: usize,
59}
60
61impl Default for ConsoleAggregatorOptions {
62    fn default() -> Self {
63        Self {
64            session_history_backfill_enabled: true,
65            max_concurrent_session_backfills: 16,
66        }
67    }
68}
69
70struct AggregatorInner {
71    store: Arc<dyn ConsoleLogStore>,
72    runtimes: RwLock<BTreeMap<String, RuntimeEntry>>,
73    event_tx: broadcast::Sender<ConsoleTimelineEvent>,
74    active_session_backfills: tokio::sync::Mutex<BTreeSet<String>>,
75    opportunistic_session_backfills: tokio::sync::Mutex<BTreeSet<String>>,
76    session_backfill_permits: Arc<Semaphore>,
77    identity_read_model: ConsoleIdentityReadModel,
78    options: ConsoleAggregatorOptions,
79}
80
81#[derive(Clone)]
82struct ConsoleIdentityReadModel {
83    inner: Arc<tokio::sync::RwLock<Vec<ConsoleIdentityRecord>>>,
84    refresh_lock: Arc<tokio::sync::Mutex<()>>,
85    primed: Arc<AtomicBool>,
86}
87
88impl Default for ConsoleIdentityReadModel {
89    fn default() -> Self {
90        Self {
91            inner: Arc::new(tokio::sync::RwLock::new(Vec::new())),
92            refresh_lock: Arc::new(tokio::sync::Mutex::new(())),
93            primed: Arc::new(AtomicBool::new(false)),
94        }
95    }
96}
97
98impl ConsoleIdentityReadModel {
99    async fn snapshot(
100        &self,
101        inner: Arc<AggregatorInner>,
102    ) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
103        if !self.primed.load(Ordering::Acquire) {
104            self.prime_now(inner).await?;
105        }
106        Ok(self.inner.read().await.clone())
107    }
108
109    async fn prime_now(&self, inner: Arc<AggregatorInner>) -> ConsoleLogResult<()> {
110        if self.primed.load(Ordering::Acquire) {
111            return Ok(());
112        }
113        let _guard = self.refresh_lock.clone().lock_owned().await;
114        if self.primed.load(Ordering::Acquire) {
115            return Ok(());
116        }
117        let identities = collect_identity_records(&inner).await?;
118        *self.inner.write().await = identities;
119        self.primed.store(true, Ordering::Release);
120        Ok(())
121    }
122
123    fn refresh_soon(&self, inner: Arc<AggregatorInner>) {
124        let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else {
125            return;
126        };
127        let Ok(guard) = self.refresh_lock.clone().try_lock_owned() else {
128            return;
129        };
130        let read_model = self.clone();
131        runtime_handle.spawn(async move {
132            let _guard = guard;
133            match collect_identity_records(&inner).await {
134                Ok(identities) => {
135                    *read_model.inner.write().await = identities;
136                    read_model.primed.store(true, Ordering::Release);
137                }
138                Err(err) => {
139                    tracing::warn!(error = %err, "console identity read-model refresh failed");
140                }
141            }
142        });
143    }
144
145    async fn replace(&self, identities: Vec<ConsoleIdentityRecord>) {
146        *self.inner.write().await = identities;
147        self.primed.store(true, Ordering::Release);
148    }
149}
150
151#[derive(Clone)]
152struct RuntimeEntry {
153    runtime_key: String,
154    identity_namespace: String,
155    runtime: MobRuntime,
156    console_events: ConsoleEventStore,
157    visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
158}
159
160#[derive(Clone)]
161struct ResolvedConsoleMember {
162    entry: RuntimeEntry,
163    handle: MobHandle,
164    member: MobMemberListEntry,
165    runtime_identity: String,
166}
167
168pub trait ConsoleVisibilityPolicy: Send + Sync {
169    fn include_implicit_delegate_members(&self) -> bool {
170        true
171    }
172
173    fn member_visible(&self, _member: &ConsoleMember) -> bool {
174        true
175    }
176
177    fn identity_visible(&self, _record: &ConsoleIdentityRecord) -> bool {
178        true
179    }
180
181    fn frame_visible(&self, _frame: &ConsoleFrame) -> bool {
182        true
183    }
184
185    fn redact_payload(&self, _frame: &NewConsoleFrame) -> Option<Value> {
186        None
187    }
188}
189
190#[derive(Debug, Default)]
191pub struct AllowAllConsoleVisibilityPolicy;
192
193impl ConsoleVisibilityPolicy for AllowAllConsoleVisibilityPolicy {}
194
195#[derive(Debug, Default)]
196pub struct HideImplicitDelegateMembersConsoleVisibilityPolicy;
197
198impl ConsoleVisibilityPolicy for HideImplicitDelegateMembersConsoleVisibilityPolicy {
199    fn include_implicit_delegate_members(&self) -> bool {
200        false
201    }
202
203    fn member_visible(&self, member: &ConsoleMember) -> bool {
204        !is_implicit_delegate_member(member.role.as_str(), &member.labels)
205    }
206
207    fn identity_visible(&self, record: &ConsoleIdentityRecord) -> bool {
208        !is_implicit_delegate_member(
209            record
210                .labels
211                .get("role")
212                .map(String::as_str)
213                .unwrap_or_default(),
214            &record.labels,
215        )
216    }
217}
218
219#[derive(Clone)]
220pub struct ConsoleRuntimeRegistration {
221    pub runtime_key: String,
222    pub runtime: Arc<UnifiedRuntime>,
223    pub identity_namespace: String,
224    pub visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
225}
226
227impl MobKitConsoleAggregator {
228    pub fn new(store: Arc<dyn ConsoleLogStore>) -> Self {
229        Self::new_with_options(store, ConsoleAggregatorOptions::default())
230    }
231
232    pub fn new_with_options(
233        store: Arc<dyn ConsoleLogStore>,
234        mut options: ConsoleAggregatorOptions,
235    ) -> Self {
236        options.max_concurrent_session_backfills = options.max_concurrent_session_backfills.max(1);
237        let (event_tx, _) = broadcast::channel(TIMELINE_CHANNEL_CAP);
238        Self {
239            inner: Arc::new(AggregatorInner {
240                store,
241                runtimes: RwLock::new(BTreeMap::new()),
242                event_tx,
243                active_session_backfills: tokio::sync::Mutex::new(BTreeSet::new()),
244                opportunistic_session_backfills: tokio::sync::Mutex::new(BTreeSet::new()),
245                session_backfill_permits: Arc::new(Semaphore::new(
246                    options.max_concurrent_session_backfills,
247                )),
248                identity_read_model: ConsoleIdentityReadModel::default(),
249                options,
250            }),
251        }
252    }
253
254    pub fn in_memory() -> Self {
255        Self::new(Arc::new(InMemoryConsoleLogStore::new()))
256    }
257
258    pub fn in_memory_with_options(options: ConsoleAggregatorOptions) -> Self {
259        Self::new_with_options(Arc::new(InMemoryConsoleLogStore::new()), options)
260    }
261
262    pub fn subscribe(&self) -> broadcast::Receiver<ConsoleTimelineEvent> {
263        self.inner.event_tx.subscribe()
264    }
265
266    pub fn store(&self) -> Arc<dyn ConsoleLogStore> {
267        self.inner.store.clone()
268    }
269
270    pub fn register_runtime(&self, registration: ConsoleRuntimeRegistration) {
271        self.register_runtime_handles_with_policy(
272            registration.runtime_key,
273            registration.identity_namespace,
274            registration.runtime.mob_runtime().clone(),
275            registration.runtime.console_events(),
276            registration.visibility_policy,
277        );
278    }
279
280    pub(crate) fn register_runtime_handles_with_policy(
281        &self,
282        runtime_key: impl Into<String>,
283        identity_namespace: impl Into<String>,
284        runtime: MobRuntime,
285        console_events: ConsoleEventStore,
286        visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
287    ) {
288        let runtime_key = runtime_key.into();
289        let identity_namespace = identity_namespace.into();
290        let entry = RuntimeEntry {
291            runtime_key: runtime_key.clone(),
292            identity_namespace,
293            runtime,
294            console_events: console_events.clone(),
295            visibility_policy,
296        };
297        if let Ok(mut runtimes) = self.inner.runtimes.write() {
298            runtimes.insert(runtime_key.clone(), entry);
299        }
300        self.inner
301            .identity_read_model
302            .refresh_soon(self.inner.clone());
303        let inner = self.inner.clone();
304        let events_for_live = console_events.clone();
305        let events_for_live_recovery = console_events.clone();
306        let runtime_key_for_live = runtime_key.clone();
307        tokio::spawn(async move {
308            let mut rx = events_for_live.subscribe();
309            loop {
310                match rx.recv().await {
311                    Ok(envelope) => {
312                        let _ =
313                            project_console_event(inner.clone(), &runtime_key_for_live, envelope)
314                                .await;
315                    }
316                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
317                        let _ = recover_lagged_source_events(
318                            inner.clone(),
319                            &runtime_key_for_live,
320                            &events_for_live_recovery,
321                        )
322                        .await;
323                    }
324                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
325                }
326            }
327        });
328
329        let inner = self.inner.clone();
330        let events_for_replay = console_events;
331        let runtime_key_for_replay = runtime_key;
332        tokio::spawn(async move {
333            let mut ingestion_state = SourceIngestionState::Registered;
334            if let Ok((next, _effects)) =
335                ingestion_state.apply(SourceIngestionTransition::StartBackfill)
336            {
337                ingestion_state = next;
338            }
339            if let Ok(events) = events_for_replay.replay_all(None).await {
340                for envelope in events {
341                    let _ = project_console_event(inner.clone(), &runtime_key_for_replay, envelope)
342                        .await;
343                }
344            }
345            spawn_session_history_backfill(inner.clone(), runtime_key_for_replay.clone());
346            spawn_session_history_discovery_loop(inner.clone(), runtime_key_for_replay.clone());
347            if let Ok((next, _effects)) =
348                ingestion_state.apply(SourceIngestionTransition::BackfillComplete)
349            {
350                ingestion_state = next;
351            }
352            let _ = ingestion_state.apply(SourceIngestionTransition::StartLive);
353        });
354    }
355
356    pub async fn list_identities(&self) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
357        self.inner
358            .identity_read_model
359            .refresh_soon(self.inner.clone());
360        let identities = self
361            .inner
362            .identity_read_model
363            .snapshot(self.inner.clone())
364            .await?;
365        spawn_identity_backfills_for_records(self.inner.clone(), &identities);
366        Ok(identities)
367    }
368
369    pub(crate) async fn list_identities_fresh(
370        &self,
371    ) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
372        let _guard = self
373            .inner
374            .identity_read_model
375            .refresh_lock
376            .clone()
377            .lock_owned()
378            .await;
379        let identities = collect_identity_records(&self.inner).await?;
380        self.inner
381            .identity_read_model
382            .replace(identities.clone())
383            .await;
384        spawn_identity_backfills_for_records(self.inner.clone(), &identities);
385        Ok(identities)
386    }
387
388    pub async fn inspect_identity(
389        &self,
390        identity: &str,
391    ) -> ConsoleLogResult<Option<ConsoleIdentityInspection>> {
392        let Some(resolved) = self.resolve_member(identity).await else {
393            return Ok(None);
394        };
395        let Some(record) =
396            identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
397        else {
398            return Ok(None);
399        };
400        if !resolved.entry.visibility_policy.identity_visible(&record) {
401            return Ok(None);
402        }
403        if let Some(session_id) = record.session_id.clone() {
404            spawn_session_history_backfill_target(
405                self.inner.clone(),
406                SessionBackfillTarget {
407                    entry: resolved.entry.clone(),
408                    record: record.clone(),
409                    session_id,
410                },
411                false,
412            );
413        }
414        let peers = resolved
415            .member
416            .wired_to
417            .iter()
418            .map(ToString::to_string)
419            .collect();
420        Ok(Some(ConsoleIdentityInspection {
421            identity: record,
422            peers,
423        }))
424    }
425
426    pub async fn retire_identity(&self, identity: &str) -> ConsoleLogResult<bool> {
427        let matches = self.resolve_members(identity).await;
428        let mut retired_any = false;
429        for resolved in matches {
430            let Some(record) =
431                identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member)
432                    .await
433            else {
434                continue;
435            };
436            if !resolved.entry.visibility_policy.identity_visible(&record) {
437                continue;
438            }
439            resolved
440                .handle
441                .retire(MeerkatId::from(resolved.runtime_identity.as_str()))
442                .await
443                .map_err(|err| -> ConsoleLogError {
444                    format!("retire failed for {identity}: {err}").into()
445                })?;
446            retired_any = true;
447        }
448        Ok(retired_any)
449    }
450
451    pub async fn clear_timeline_frames(&self) -> ConsoleLogResult<()> {
452        self.inner.store.clear_frames().await
453    }
454
455    pub async fn query_timeline(
456        &self,
457        query: ConsoleTimelineQuery,
458    ) -> ConsoleLogResult<ConsoleTimelinePage> {
459        let explicit_identity = query.identity.clone();
460        let mut page = self.inner.store.query_frames(query.clone()).await?;
461        if page.frames.is_empty()
462            && let Some(identity) = explicit_identity.clone()
463        {
464            backfill_identity_for_explicit_query(self.inner.clone(), identity).await;
465            page = self.inner.store.query_frames(query).await?;
466        }
467        let mut visible_frames = Vec::with_capacity(page.frames.len());
468        let mut identity_visibility_cache = HashMap::new();
469        for frame in page.frames {
470            let allow_historical_identity =
471                explicit_identity.as_deref() == Some(frame.identity.as_str());
472            if frame_is_visible_cached(
473                &self.inner,
474                &frame,
475                allow_historical_identity,
476                &mut identity_visibility_cache,
477            )
478            .await
479            .unwrap_or(false)
480            {
481                visible_frames.push(frame);
482            }
483        }
484        page.frames = visible_frames;
485        Ok(page)
486    }
487
488    pub async fn refresh_session_history(&self) -> ConsoleLogResult<()> {
489        let runtime_keys = self
490            .inner
491            .runtimes
492            .read()
493            .map_err(|_| runtime_registry_lock_error())?
494            .keys()
495            .cloned()
496            .collect::<Vec<_>>();
497        let results =
498            join_all(runtime_keys.into_iter().map(|runtime_key| {
499                backfill_session_history(self.inner.clone(), runtime_key, true)
500            }))
501            .await;
502        for result in results {
503            result?;
504        }
505        Ok(())
506    }
507
508    pub async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
509        self.inner.store.latest_cursor().await
510    }
511
512    pub async fn timeline_event_visible(&self, event: &ConsoleTimelineEvent) -> bool {
513        match event {
514            ConsoleTimelineEvent::ConsoleFrame { frame }
515            | ConsoleTimelineEvent::FrameUpdated { frame } => {
516                frame_is_visible(&self.inner, frame, false)
517                    .await
518                    .unwrap_or(false)
519            }
520            ConsoleTimelineEvent::SnapshotStarted { .. }
521            | ConsoleTimelineEvent::SnapshotComplete { .. }
522            | ConsoleTimelineEvent::ReplayUnavailable { .. } => true,
523        }
524    }
525
526    pub async fn timeline_frame_visible_for_query(
527        &self,
528        frame: &ConsoleFrame,
529        identity: Option<&str>,
530    ) -> bool {
531        let allow_historical_identity = identity == Some(frame.identity.as_str());
532        frame_is_visible(&self.inner, frame, allow_historical_identity)
533            .await
534            .unwrap_or(false)
535    }
536
537    pub async fn send(
538        &self,
539        request: ConsoleSendRequest,
540    ) -> Result<ConsoleInteractionAccepted, ConsoleSendError> {
541        validate_send_request(&request)?;
542        let Some(resolved) = self.resolve_member(&request.identity).await else {
543            return Err(ConsoleSendError::UnknownIdentity(request.identity));
544        };
545        let Some(record) =
546            identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
547        else {
548            return Err(ConsoleSendError::UnknownIdentity(request.identity));
549        };
550        if !resolved.entry.visibility_policy.identity_visible(&record) {
551            return Err(ConsoleSendError::UnknownIdentity(request.identity));
552        }
553        if !member_is_addressable(&resolved.member) {
554            return Err(ConsoleSendError::NotAddressable(request.identity));
555        }
556        if resolved.member.state == meerkat_mob::MemberState::Retiring {
557            return Err(ConsoleSendError::Retired(request.identity));
558        }
559
560        let content = content_input_from_value(&request.content)?;
561        let handling_mode = parse_handling_mode(request.handling_mode.as_deref())?;
562        assert_member_accepts_images(
563            &resolved.handle,
564            resolved.entry.runtime.session_service(),
565            &resolved.runtime_identity,
566            &content,
567        )
568        .await
569        .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
570
571        let dedupe_key = send_dedupe_key(
572            &resolved.entry.runtime_key,
573            &request.identity,
574            &request.origin,
575            &request.idempotency_key,
576        );
577        let handling_mode_value = request
578            .handling_mode
579            .as_deref()
580            .unwrap_or("queue")
581            .to_string();
582        let request_fingerprint =
583            send_request_fingerprint(&request.origin, &request.content, &handling_mode_value);
584        if let Some(existing) = self
585            .inner
586            .store
587            .frame_by_dedupe_key(&dedupe_key)
588            .await
589            .map_err(ConsoleSendError::Log)?
590        {
591            let same_request = existing.source.source_cursor.as_deref()
592                == Some(request_fingerprint.as_str())
593                || existing.source.source_cursor.is_none()
594                    && existing.payload.get("origin").and_then(Value::as_str)
595                        == Some(request.origin.as_str())
596                    && existing.payload.get("content") == Some(&request.content)
597                    && existing
598                        .payload
599                        .get("handling_mode")
600                        .and_then(Value::as_str)
601                        == Some(handling_mode_value.as_str());
602            if !same_request {
603                return Err(ConsoleSendError::IdempotencyConflict(
604                    request.idempotency_key,
605                ));
606            }
607            return Ok(accepted_from_frame(&existing));
608        }
609
610        let interaction_id = format!("console-interaction-{}", hash_short(&dedupe_key));
611        resolved
612            .entry
613            .console_events
614            .reserve_interaction_value(
615                &resolved.runtime_identity,
616                Some(resolved.runtime_identity.as_str()),
617                &interaction_id,
618                &request.origin,
619                request.content.clone(),
620            )
621            .await
622            .map_err(ConsoleSendError::State)?;
623        let session_id = resolved
624            .handle
625            .resolve_bridge_session_id(&MeerkatId::from(resolved.runtime_identity.as_str()))
626            .await
627            .map(|sid| sid.to_string());
628        let mut new_frame = NewConsoleFrame {
629            id: None,
630            dedupe_key,
631            timestamp_ms: current_time_ms(),
632            runtime_key: resolved.entry.runtime_key.clone(),
633            identity: request.identity.clone(),
634            conversation_id: Some(request.identity.clone()),
635            session_id: session_id.clone(),
636            kind: "user_input".to_string(),
637            status: ConsoleFrameStatus::Accepted,
638            payload: json!({
639                "content": request.content,
640                "origin": request.origin,
641                "idempotency_key": request.idempotency_key,
642                "handling_mode": handling_mode_value,
643            }),
644            source: ConsoleFrameSource {
645                kind: ConsoleFrameSourceKind::Send,
646                source_cursor: Some(request_fingerprint),
647            },
648            source_event_id: None,
649            interaction_id: Some(interaction_id.clone()),
650            turn_id: None,
651            run_id: None,
652            parent_frame_id: None,
653            caused_by_frame_id: None,
654        };
655        if let Some(redacted) = resolved.entry.visibility_policy.redact_payload(&new_frame) {
656            new_frame.payload = redacted;
657            new_frame.status = ConsoleFrameStatus::Redacted;
658        }
659
660        let _ = SendState::Requested
661            .apply(SendTransition::PersistAccepted)
662            .map_err(ConsoleSendError::State)?;
663        let outcome = self
664            .inner
665            .store
666            .append_if_absent(new_frame)
667            .await
668            .map_err(ConsoleSendError::Log)?;
669        if outcome.disposition == AppendDisposition::Existing {
670            return Ok(accepted_from_frame(&outcome.frame));
671        }
672        let _ = self
673            .inner
674            .event_tx
675            .send(ConsoleTimelineEvent::ConsoleFrame {
676                frame: outcome.frame.clone(),
677            });
678        let accepted = accepted_from_frame(&outcome.frame);
679
680        let (dispatching, _effects) = SendState::AcceptedPersisted
681            .apply(SendTransition::StartDispatch)
682            .map_err(ConsoleSendError::State)?;
683        update_frame_status_and_emit(
684            &self.inner,
685            &outcome.frame.id,
686            ConsoleFrameStatus::Dispatching,
687        )
688        .await
689        .map_err(ConsoleSendError::Log)?;
690
691        spawn_console_send_dispatch(
692            self.inner.clone(),
693            resolved,
694            content,
695            handling_mode,
696            dispatching,
697            outcome.frame,
698            interaction_id,
699        );
700        Ok(accepted)
701    }
702
703    pub async fn reserve_identity_first_interaction(
704        &self,
705        request: ConsoleSendRequest,
706        session_id: Option<&str>,
707    ) -> Result<ConsoleInteractionAccepted, ConsoleSendError> {
708        validate_send_request(&request)?;
709        let _content = content_input_from_value(&request.content)?;
710        let handling_mode_value = request
711            .handling_mode
712            .as_deref()
713            .unwrap_or("queue")
714            .to_string();
715        let dedupe_key = send_dedupe_key(
716            "identity-first",
717            &request.identity,
718            &request.origin,
719            &request.idempotency_key,
720        );
721        let request_fingerprint =
722            send_request_fingerprint(&request.origin, &request.content, &handling_mode_value);
723        if let Some(existing) = self
724            .inner
725            .store
726            .frame_by_dedupe_key(&dedupe_key)
727            .await
728            .map_err(ConsoleSendError::Log)?
729        {
730            let same_request = existing.source.source_cursor.as_deref()
731                == Some(request_fingerprint.as_str())
732                || existing.source.source_cursor.is_none()
733                    && existing.payload.get("origin").and_then(Value::as_str)
734                        == Some(request.origin.as_str())
735                    && existing.payload.get("content") == Some(&request.content)
736                    && existing
737                        .payload
738                        .get("handling_mode")
739                        .and_then(Value::as_str)
740                        == Some(handling_mode_value.as_str());
741            if !same_request {
742                return Err(ConsoleSendError::IdempotencyConflict(
743                    request.idempotency_key,
744                ));
745            }
746            return Ok(accepted_from_frame(&existing));
747        }
748
749        let interaction_id = format!("console-interaction-{}", hash_short(&dedupe_key));
750        let new_frame = NewConsoleFrame {
751            id: None,
752            dedupe_key,
753            timestamp_ms: current_time_ms(),
754            runtime_key: "identity-first".to_string(),
755            identity: request.identity.clone(),
756            conversation_id: Some(request.identity.clone()),
757            session_id: session_id.map(ToString::to_string),
758            kind: "user_input".to_string(),
759            status: ConsoleFrameStatus::Accepted,
760            payload: json!({
761                "content": request.content,
762                "origin": request.origin,
763                "idempotency_key": request.idempotency_key,
764                "handling_mode": handling_mode_value,
765            }),
766            source: ConsoleFrameSource {
767                kind: ConsoleFrameSourceKind::Send,
768                source_cursor: Some(request_fingerprint),
769            },
770            source_event_id: None,
771            interaction_id: Some(interaction_id),
772            turn_id: None,
773            run_id: None,
774            parent_frame_id: None,
775            caused_by_frame_id: None,
776        };
777        let outcome = self
778            .inner
779            .store
780            .append_if_absent(new_frame)
781            .await
782            .map_err(ConsoleSendError::Log)?;
783        let _ = self
784            .inner
785            .event_tx
786            .send(ConsoleTimelineEvent::ConsoleFrame {
787                frame: outcome.frame.clone(),
788            });
789        Ok(accepted_from_frame(&outcome.frame))
790    }
791
792    pub async fn mark_interaction_delivery_failed(
793        &self,
794        input_frame_id: &str,
795    ) -> Result<(), ConsoleSendError> {
796        update_frame_status_and_emit(
797            &self.inner,
798            input_frame_id,
799            ConsoleFrameStatus::DeliveryFailed,
800        )
801        .await
802        .map_err(ConsoleSendError::Log)?;
803        Ok(())
804    }
805
806    pub async fn binary_blob_store_for_identity(
807        &self,
808        identity: &str,
809    ) -> Result<Option<Arc<dyn BinaryBlobStore>>, ConsoleSendError> {
810        if identity.trim().is_empty() {
811            return Err(ConsoleSendError::InvalidRequest(
812                "identity must be non-empty".to_string(),
813            ));
814        }
815        let Some(resolved) = self.resolve_member(identity).await else {
816            return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
817        };
818        let Some(record) =
819            identity_record_for_member(&resolved.entry, &resolved.handle, &resolved.member).await
820        else {
821            return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
822        };
823        if !resolved.entry.visibility_policy.identity_visible(&record) {
824            return Err(ConsoleSendError::UnknownIdentity(identity.to_string()));
825        }
826        if !member_is_addressable(&resolved.member) {
827            return Err(ConsoleSendError::NotAddressable(identity.to_string()));
828        }
829        if resolved.member.state == meerkat_mob::MemberState::Retiring {
830            return Err(ConsoleSendError::Retired(identity.to_string()));
831        }
832        Ok(resolved.entry.runtime.binary_blob_store())
833    }
834
835    pub fn binary_blob_stores(&self) -> Vec<Arc<dyn BinaryBlobStore>> {
836        self.inner
837            .runtimes
838            .read()
839            .map(|entries| {
840                entries
841                    .values()
842                    .filter_map(|entry| entry.runtime.binary_blob_store())
843                    .collect()
844            })
845            .unwrap_or_default()
846    }
847
848    async fn resolve_member(&self, identity: &str) -> Option<ResolvedConsoleMember> {
849        self.resolve_members(identity).await.into_iter().next()
850    }
851
852    async fn resolve_members(&self, identity: &str) -> Vec<ResolvedConsoleMember> {
853        let entries = self
854            .inner
855            .runtimes
856            .read()
857            .ok()
858            .map(|entries| entries.clone())
859            .unwrap_or_default();
860        let mut matches: Vec<(String, ResolvedConsoleMember)> = Vec::new();
861        for entry in entries.values() {
862            let Some(raw_identity) = strip_namespace(identity, &entry.identity_namespace) else {
863                continue;
864            };
865            let mid = MeerkatId::from(raw_identity.as_str());
866            for resolved in member_sources_for_entry(entry)
867                .await
868                .into_iter()
869                .filter(|candidate| {
870                    candidate.member.agent_identity == mid
871                        || candidate
872                            .member
873                            .labels
874                            .get("agent_identity")
875                            .is_some_and(|agent_identity| agent_identity == &raw_identity)
876                })
877            {
878                let session_id = resolved
879                    .handle
880                    .resolve_bridge_session_id(&resolved.member.agent_identity)
881                    .await
882                    .map(|sid| sid.to_string())
883                    .unwrap_or_default();
884                matches.push((session_id, resolved));
885            }
886        }
887        matches.sort_by(|left, right| right.0.cmp(&left.0));
888        matches.into_iter().map(|(_, resolved)| resolved).collect()
889    }
890}
891
892fn dedupe_identity_records(records: Vec<ConsoleIdentityRecord>) -> Vec<ConsoleIdentityRecord> {
893    let mut by_identity: BTreeMap<String, ConsoleIdentityRecord> = BTreeMap::new();
894    for record in records {
895        by_identity
896            .entry(record.identity.clone())
897            .and_modify(|current| {
898                if identity_record_prefer(&record, current) {
899                    *current = record.clone();
900                }
901            })
902            .or_insert(record);
903    }
904    by_identity.into_values().collect()
905}
906
907fn identity_record_prefer(
908    candidate: &ConsoleIdentityRecord,
909    current: &ConsoleIdentityRecord,
910) -> bool {
911    let candidate_live = candidate.addressable && candidate.health != "retired";
912    let current_live = current.addressable && current.health != "retired";
913    if candidate_live != current_live {
914        return candidate_live;
915    }
916    candidate.session_id.as_deref().unwrap_or("") > current.session_id.as_deref().unwrap_or("")
917}
918
919async fn collect_identity_records(
920    inner: &Arc<AggregatorInner>,
921) -> ConsoleLogResult<Vec<ConsoleIdentityRecord>> {
922    let entries = inner
923        .runtimes
924        .read()
925        .map_err(|_| runtime_registry_lock_error())?
926        .clone();
927    let mut identities = Vec::new();
928    for entry in entries.values() {
929        for resolved in member_sources_for_entry(entry).await {
930            if let Some(record) =
931                identity_record_for_member(entry, &resolved.handle, &resolved.member).await
932                && entry.visibility_policy.identity_visible(&record)
933            {
934                identities.push(record);
935            }
936        }
937    }
938    Ok(dedupe_identity_records(identities))
939}
940
941fn spawn_identity_backfills_for_records(
942    inner: Arc<AggregatorInner>,
943    records: &[ConsoleIdentityRecord],
944) {
945    if !inner.options.session_history_backfill_enabled {
946        return;
947    }
948    let entries = match inner.runtimes.read() {
949        Ok(entries) => entries.clone(),
950        Err(_) => return,
951    };
952    for record in records {
953        let Some(entry) = entries.get(&record.runtime_key).cloned() else {
954            continue;
955        };
956        let Some(session_id) = record.session_id.clone() else {
957            continue;
958        };
959        spawn_session_history_backfill_target(
960            inner.clone(),
961            SessionBackfillTarget {
962                entry,
963                record: record.clone(),
964                session_id,
965            },
966            false,
967        );
968    }
969}
970
971async fn member_sources_for_entry(entry: &RuntimeEntry) -> Vec<ResolvedConsoleMember> {
972    let mut resolved = Vec::new();
973    let primary_handle = entry.runtime.handle();
974    let primary_mob_id = primary_handle.mob_id().to_string();
975    for member in primary_handle.list_members_including_retiring().await {
976        resolved.push(ResolvedConsoleMember {
977            entry: entry.clone(),
978            handle: primary_handle.clone(),
979            runtime_identity: member.agent_identity.to_string(),
980            member,
981        });
982    }
983
984    let Some(state) = entry.runtime.agent_mob_mcp_state() else {
985        return resolved;
986    };
987    if !entry.visibility_policy.include_implicit_delegate_members() {
988        return resolved;
989    }
990    for (mob_id, _state) in state.mob_list().await {
991        if mob_id.as_str() == primary_mob_id {
992            continue;
993        }
994        let Ok(handle) = state.handle_for(&mob_id).await else {
995            continue;
996        };
997        for member in handle.list_members_including_retiring().await {
998            resolved.push(ResolvedConsoleMember {
999                entry: entry.clone(),
1000                handle: handle.clone(),
1001                runtime_identity: member.agent_identity.to_string(),
1002                member,
1003            });
1004        }
1005    }
1006    resolved
1007}
1008
1009async fn dispatch_message_to_resolved_member(
1010    resolved: &ResolvedConsoleMember,
1011    content: ContentInput,
1012    handling_mode: meerkat_core::types::HandlingMode,
1013) -> Result<String, String> {
1014    let mid = MeerkatId::from(resolved.runtime_identity.as_str());
1015    match send_message_on_mob_with_mode(
1016        &resolved.handle,
1017        &resolved.runtime_identity,
1018        content.clone(),
1019        handling_mode,
1020    )
1021    .await
1022    {
1023        Ok(session_id) => Ok(session_id),
1024        Err(err) if err.to_string().contains("not externally addressable") => {
1025            let member = resolved
1026                .handle
1027                .member(&mid)
1028                .await
1029                .map_err(|err| err.to_string())?;
1030            let _receipt = member
1031                .internal_turn(content)
1032                .await
1033                .map_err(|err| err.to_string())?;
1034            resolved
1035                .handle
1036                .resolve_bridge_session_id(&mid)
1037                .await
1038                .map(|sid| sid.to_string())
1039                .ok_or_else(|| "member has no bridge session after internal turn".to_string())
1040        }
1041        Err(err) => Err(err.to_string()),
1042    }
1043}
1044
1045fn spawn_console_send_dispatch(
1046    inner: Arc<AggregatorInner>,
1047    resolved: ResolvedConsoleMember,
1048    content: ContentInput,
1049    handling_mode: meerkat_core::types::HandlingMode,
1050    dispatching: SendState,
1051    user_frame: ConsoleFrame,
1052    interaction_id: String,
1053) {
1054    tokio::spawn(async move {
1055        match dispatch_message_to_resolved_member(&resolved, content, handling_mode).await {
1056            Ok(_) => {
1057                let _ = dispatching.apply(SendTransition::MarkDelivered);
1058                if let Err(err) = update_frame_status_and_emit(
1059                    &inner,
1060                    &user_frame.id,
1061                    ConsoleFrameStatus::Delivered,
1062                )
1063                .await
1064                {
1065                    tracing::warn!(
1066                        frame_id = %user_frame.id,
1067                        error = %err,
1068                        "failed to update console send delivery status"
1069                    );
1070                }
1071            }
1072            Err(err) => {
1073                let _ = dispatching.apply(SendTransition::MarkDeliveryFailed);
1074                if let Err(update_err) = update_frame_status_and_emit(
1075                    &inner,
1076                    &user_frame.id,
1077                    ConsoleFrameStatus::DeliveryFailed,
1078                )
1079                .await
1080                {
1081                    tracing::warn!(
1082                        frame_id = %user_frame.id,
1083                        error = %update_err,
1084                        "failed to update console send failure status"
1085                    );
1086                }
1087                let failure_frame = NewConsoleFrame {
1088                    id: None,
1089                    dedupe_key: format!("delivery-failed:{}", user_frame.id),
1090                    timestamp_ms: current_time_ms(),
1091                    runtime_key: user_frame.runtime_key,
1092                    identity: user_frame.identity,
1093                    conversation_id: user_frame.conversation_id,
1094                    session_id: user_frame.session_id,
1095                    kind: "message_delivery_failed".to_string(),
1096                    status: ConsoleFrameStatus::DeliveryFailed,
1097                    payload: json!({ "reason": err }),
1098                    source: ConsoleFrameSource {
1099                        kind: ConsoleFrameSourceKind::Synthetic,
1100                        source_cursor: None,
1101                    },
1102                    source_event_id: None,
1103                    interaction_id: Some(interaction_id),
1104                    turn_id: None,
1105                    run_id: None,
1106                    parent_frame_id: Some(user_frame.id.clone()),
1107                    caused_by_frame_id: Some(user_frame.id),
1108                };
1109                if let Err(append_err) = append_and_emit(&inner, failure_frame).await {
1110                    tracing::warn!(
1111                        error = %append_err,
1112                        "failed to append console send failure frame"
1113                    );
1114                }
1115            }
1116        }
1117    });
1118}
1119
1120#[derive(Debug)]
1121pub enum ConsoleSendError {
1122    UnknownIdentity(String),
1123    NotAddressable(String),
1124    Retired(String),
1125    InvalidContent(String),
1126    InvalidHandlingMode(String),
1127    InvalidRequest(String),
1128    IdempotencyConflict(String),
1129    State(&'static str),
1130    Dispatch(String),
1131    Log(ConsoleLogError),
1132}
1133
1134impl std::fmt::Display for ConsoleSendError {
1135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136        match self {
1137            Self::UnknownIdentity(identity) => write!(f, "unknown identity: {identity}"),
1138            Self::NotAddressable(identity) => write!(f, "not addressable: {identity}"),
1139            Self::Retired(identity) => write!(f, "identity retired: {identity}"),
1140            Self::InvalidContent(message) => write!(f, "invalid content: {message}"),
1141            Self::InvalidHandlingMode(mode) => write!(f, "invalid handling mode: {mode}"),
1142            Self::InvalidRequest(message) => write!(f, "invalid request: {message}"),
1143            Self::IdempotencyConflict(key) => write!(f, "idempotency key conflict: {key}"),
1144            Self::State(message) => write!(f, "console send state error: {message}"),
1145            Self::Dispatch(message) => write!(f, "dispatch failed: {message}"),
1146            Self::Log(err) => write!(f, "console log error: {err}"),
1147        }
1148    }
1149}
1150
1151impl std::error::Error for ConsoleSendError {}
1152
1153async fn backfill_session_history(
1154    inner: Arc<AggregatorInner>,
1155    runtime_key: String,
1156    force_refresh: bool,
1157) -> ConsoleLogResult<()> {
1158    if !inner.options.session_history_backfill_enabled {
1159        return Ok(());
1160    }
1161    let Some(entry) = inner
1162        .runtimes
1163        .read()
1164        .ok()
1165        .and_then(|entries| entries.get(&runtime_key).cloned())
1166    else {
1167        return Ok(());
1168    };
1169    let members = member_sources_for_entry(&entry).await;
1170    let mut targets = Vec::new();
1171    for resolved in members {
1172        let member = resolved.member;
1173        let Some(record) = identity_record_for_member(&entry, &resolved.handle, &member).await
1174        else {
1175            continue;
1176        };
1177        if !entry.visibility_policy.identity_visible(&record) {
1178            continue;
1179        }
1180        let Some(session_id) = record.session_id.clone() else {
1181            continue;
1182        };
1183        targets.push(SessionBackfillTarget {
1184            entry: entry.clone(),
1185            record,
1186            session_id,
1187        });
1188    }
1189    backfill_session_history_targets(inner, targets, force_refresh).await
1190}
1191
1192#[derive(Clone)]
1193struct SessionBackfillTarget {
1194    entry: RuntimeEntry,
1195    record: ConsoleIdentityRecord,
1196    session_id: String,
1197}
1198
1199async fn backfill_session_history_targets(
1200    inner: Arc<AggregatorInner>,
1201    targets: Vec<SessionBackfillTarget>,
1202    force_refresh: bool,
1203) -> ConsoleLogResult<()> {
1204    let mut tasks = tokio::task::JoinSet::new();
1205    for target in targets {
1206        tasks.spawn(backfill_one_session_history(
1207            inner.clone(),
1208            target,
1209            force_refresh,
1210        ));
1211    }
1212    let mut first_error = None;
1213    while let Some(result) = tasks.join_next().await {
1214        match result {
1215            Ok(Ok(())) => {}
1216            Ok(Err(err)) => {
1217                if first_error.is_none() {
1218                    first_error = Some(err);
1219                }
1220            }
1221            Err(err) => {
1222                if first_error.is_none() {
1223                    first_error = Some(Box::new(std::io::Error::other(format!(
1224                        "session backfill task failed: {err}"
1225                    ))) as ConsoleLogError);
1226                }
1227            }
1228        }
1229    }
1230    if let Some(err) = first_error {
1231        Err(err)
1232    } else {
1233        Ok(())
1234    }
1235}
1236
1237async fn backfill_one_session_history(
1238    inner: Arc<AggregatorInner>,
1239    target: SessionBackfillTarget,
1240    force_refresh: bool,
1241) -> ConsoleLogResult<()> {
1242    let _permit = inner
1243        .session_backfill_permits
1244        .clone()
1245        .acquire_owned()
1246        .await
1247        .map_err(|err| -> ConsoleLogError {
1248            Box::new(std::io::Error::other(format!(
1249                "session backfill limiter closed: {err}"
1250            )))
1251        })?;
1252    let SessionBackfillTarget {
1253        entry,
1254        record,
1255        session_id,
1256    } = target;
1257    let watermark_runtime_key =
1258        session_history_watermark_runtime_key(&entry.runtime_key, &session_id);
1259    let watermark = inner
1260        .store
1261        .source_watermark(
1262            &watermark_runtime_key,
1263            ConsoleFrameSourceKind::SessionHistory,
1264        )
1265        .await?;
1266    let now_ms = current_time_ms();
1267    let mut offset = watermark
1268        .as_deref()
1269        .and_then(|watermark| parse_session_history_watermark(watermark, &session_id))
1270        .unwrap_or(0);
1271    if !force_refresh
1272        && watermark.as_deref().is_some_and(|watermark| {
1273            session_history_watermark_is_fresh(watermark, &session_id, now_ms)
1274        })
1275    {
1276        return Ok(());
1277    }
1278    loop {
1279        let page = match entry
1280            .runtime
1281            .read_session_history(&session_id, offset, Some(SESSION_HISTORY_PAGE_LIMIT))
1282            .await
1283        {
1284            Ok(page) => page,
1285            Err(err) => {
1286                append_backfill_gap(
1287                    &inner,
1288                    &entry.runtime_key,
1289                    &record.identity,
1290                    err.to_string(),
1291                )
1292                .await?;
1293                break;
1294            }
1295        };
1296        let page_value = match serde_json::to_value(page) {
1297            Ok(value) => value,
1298            Err(err) => {
1299                append_backfill_gap(
1300                    &inner,
1301                    &entry.runtime_key,
1302                    &record.identity,
1303                    err.to_string(),
1304                )
1305                .await?;
1306                break;
1307            }
1308        };
1309        let base_offset = page_value
1310            .get("offset")
1311            .and_then(Value::as_u64)
1312            .unwrap_or(offset as u64) as usize;
1313        let Some(messages) = page_value.get("messages").and_then(Value::as_array) else {
1314            append_backfill_gap(
1315                &inner,
1316                &entry.runtime_key,
1317                &record.identity,
1318                "session history page missing messages".to_string(),
1319            )
1320            .await?;
1321            break;
1322        };
1323        if messages.is_empty() {
1324            if offset > 0 {
1325                record_session_history_watermark(
1326                    &inner,
1327                    &watermark_runtime_key,
1328                    &session_id,
1329                    offset,
1330                )
1331                .await?;
1332            }
1333            break;
1334        }
1335        for (idx, message) in messages.iter().enumerate() {
1336            let absolute_offset = base_offset + idx;
1337            let frames = frames_from_session_history_message(
1338                &entry.runtime_key,
1339                &record.identity,
1340                &session_id,
1341                absolute_offset,
1342                message.clone(),
1343            );
1344            for mut frame in frames {
1345                if history_frame_has_existing_counterpart(&inner, &frame).await? {
1346                    continue;
1347                }
1348                if let Some(redacted) = entry.visibility_policy.redact_payload(&frame) {
1349                    frame.payload = redacted;
1350                    frame.status = ConsoleFrameStatus::Redacted;
1351                }
1352                append_and_emit(&inner, frame).await?;
1353            }
1354        }
1355        offset = base_offset + messages.len();
1356        record_session_history_watermark(&inner, &watermark_runtime_key, &session_id, offset)
1357            .await?;
1358        let has_more = page_value
1359            .get("has_more")
1360            .and_then(Value::as_bool)
1361            .unwrap_or(false);
1362        if !has_more || messages.len() < SESSION_HISTORY_PAGE_LIMIT {
1363            break;
1364        }
1365    }
1366    Ok(())
1367}
1368
1369async fn record_session_history_watermark(
1370    inner: &AggregatorInner,
1371    watermark_runtime_key: &str,
1372    session_id: &str,
1373    offset: usize,
1374) -> ConsoleLogResult<()> {
1375    inner
1376        .store
1377        .record_source_watermark(
1378            watermark_runtime_key,
1379            ConsoleFrameSourceKind::SessionHistory,
1380            &format_session_history_watermark(session_id, offset, current_time_ms()),
1381        )
1382        .await
1383}
1384
1385fn spawn_session_history_backfill(inner: Arc<AggregatorInner>, runtime_key: String) {
1386    if !inner.options.session_history_backfill_enabled {
1387        return;
1388    }
1389    tokio::spawn(async move {
1390        {
1391            let mut active = inner.active_session_backfills.lock().await;
1392            if !active.insert(runtime_key.clone()) {
1393                return;
1394            }
1395        }
1396        let result = backfill_session_history(inner.clone(), runtime_key.clone(), false).await;
1397        let mut active = inner.active_session_backfills.lock().await;
1398        active.remove(&runtime_key);
1399        drop(active);
1400        if let Err(err) = result {
1401            tracing::warn!(
1402                runtime_key = %runtime_key,
1403                error = %err,
1404                "console session-history backfill failed"
1405            );
1406        }
1407    });
1408}
1409
1410fn spawn_session_history_discovery_loop(inner: Arc<AggregatorInner>, runtime_key: String) {
1411    if !inner.options.session_history_backfill_enabled {
1412        return;
1413    }
1414    tokio::spawn(async move {
1415        let mut interval = tokio::time::interval(SESSION_HISTORY_DISCOVERY_INTERVAL);
1416        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1417        loop {
1418            interval.tick().await;
1419            let runtime_still_registered = inner
1420                .runtimes
1421                .read()
1422                .ok()
1423                .is_some_and(|entries| entries.contains_key(&runtime_key));
1424            if !runtime_still_registered {
1425                break;
1426            }
1427            spawn_session_history_backfill(inner.clone(), runtime_key.clone());
1428        }
1429    });
1430}
1431
1432fn spawn_session_history_backfill_target(
1433    inner: Arc<AggregatorInner>,
1434    target: SessionBackfillTarget,
1435    force_refresh: bool,
1436) {
1437    if !inner.options.session_history_backfill_enabled {
1438        return;
1439    }
1440    tokio::spawn(async move {
1441        let active_key = targeted_session_history_active_key(&target, force_refresh);
1442        let result =
1443            run_targeted_session_history_backfill(inner.clone(), target, force_refresh).await;
1444        if let Err(err) = result {
1445            tracing::warn!(
1446                active_key = %active_key,
1447                error = %err,
1448                "console targeted session-history backfill failed"
1449            );
1450        }
1451    });
1452}
1453
1454async fn run_targeted_session_history_backfill(
1455    inner: Arc<AggregatorInner>,
1456    target: SessionBackfillTarget,
1457    force_refresh: bool,
1458) -> ConsoleLogResult<()> {
1459    let active_key = targeted_session_history_active_key(&target, force_refresh);
1460    {
1461        let mut active = inner.active_session_backfills.lock().await;
1462        if !active.insert(active_key.clone()) {
1463            return Ok(());
1464        }
1465    }
1466    let result = backfill_one_session_history(inner.clone(), target, force_refresh).await;
1467    let mut active = inner.active_session_backfills.lock().await;
1468    active.remove(&active_key);
1469    result
1470}
1471
1472fn targeted_session_history_active_key(
1473    target: &SessionBackfillTarget,
1474    force_refresh: bool,
1475) -> String {
1476    let mode = if force_refresh { "force" } else { "refresh" };
1477    format!(
1478        "{}:session-history:{}:{mode}",
1479        target.entry.runtime_key, target.session_id
1480    )
1481}
1482
1483fn spawn_session_history_backfill_for_identity(
1484    inner: Arc<AggregatorInner>,
1485    identity: String,
1486    force_refresh: bool,
1487) {
1488    if !inner.options.session_history_backfill_enabled {
1489        return;
1490    }
1491    tokio::spawn(async move {
1492        let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1493            return;
1494        };
1495        spawn_session_history_backfill_target(inner, target, force_refresh);
1496    });
1497}
1498
1499async fn backfill_identity_for_explicit_query(inner: Arc<AggregatorInner>, identity: String) {
1500    if !inner.options.session_history_backfill_enabled {
1501        return;
1502    }
1503    let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1504        return;
1505    };
1506    let (tx, rx) = oneshot::channel();
1507    tokio::spawn(async move {
1508        let result = run_targeted_session_history_backfill(inner, target, true)
1509            .await
1510            .map_err(|err| err.to_string());
1511        let _ = tx.send(result);
1512    });
1513    match tokio::time::timeout(EXPLICIT_IDENTITY_BACKFILL_WAIT, rx).await {
1514        Ok(Ok(Ok(())) | Err(_)) | Err(_) => {}
1515        Ok(Ok(Err(err))) => {
1516            tracing::warn!(
1517                identity = %identity,
1518                error = %err,
1519                "console explicit identity session-history refresh failed"
1520            );
1521        }
1522    }
1523}
1524
1525fn spawn_opportunistic_session_history_backfill_for_identity(
1526    inner: Arc<AggregatorInner>,
1527    identity: String,
1528) {
1529    if !inner.options.session_history_backfill_enabled {
1530        return;
1531    }
1532    tokio::spawn(async move {
1533        let Some(target) = session_backfill_target_for_identity(&inner, &identity).await else {
1534            return;
1535        };
1536        let active_key = format!(
1537            "{}:session-history:{}",
1538            target.entry.runtime_key, target.session_id
1539        );
1540        {
1541            let mut seen = inner.opportunistic_session_backfills.lock().await;
1542            if !seen.insert(active_key) {
1543                return;
1544            }
1545        }
1546        spawn_session_history_backfill_target(inner, target, false);
1547    });
1548}
1549
1550async fn session_backfill_target_for_identity(
1551    inner: &AggregatorInner,
1552    identity: &str,
1553) -> Option<SessionBackfillTarget> {
1554    let entries = inner
1555        .runtimes
1556        .read()
1557        .ok()
1558        .map(|entries| entries.clone())
1559        .unwrap_or_default();
1560    for entry in entries.values() {
1561        let Some(raw_identity) = strip_namespace(identity, &entry.identity_namespace) else {
1562            continue;
1563        };
1564        let mid = MeerkatId::from(raw_identity.as_str());
1565        for resolved in member_sources_for_entry(entry)
1566            .await
1567            .into_iter()
1568            .filter(|candidate| candidate.member.agent_identity == mid)
1569        {
1570            let Some(record) =
1571                identity_record_for_member(entry, &resolved.handle, &resolved.member).await
1572            else {
1573                continue;
1574            };
1575            if !entry.visibility_policy.identity_visible(&record) {
1576                continue;
1577            }
1578            let Some(session_id) = record.session_id.clone() else {
1579                continue;
1580            };
1581            return Some(SessionBackfillTarget {
1582                entry: entry.clone(),
1583                record,
1584                session_id,
1585            });
1586        }
1587    }
1588    None
1589}
1590
1591async fn recover_lagged_source_events(
1592    inner: Arc<AggregatorInner>,
1593    runtime_key: &str,
1594    console_events: &ConsoleEventStore,
1595) -> ConsoleLogResult<()> {
1596    let watermark = inner
1597        .store
1598        .source_watermark(runtime_key, ConsoleFrameSourceKind::ConsoleEvent)
1599        .await?;
1600    match console_events.replay_all(watermark.as_deref()).await {
1601        Ok(events) => {
1602            for envelope in events {
1603                project_console_event(inner.clone(), runtime_key, envelope).await?;
1604            }
1605        }
1606        Err(err) => {
1607            append_source_gap(
1608                &inner,
1609                runtime_key,
1610                format!(
1611                    "{}:{}:{}",
1612                    err.error, err.stream, err.requested_last_event_id
1613                ),
1614            )
1615            .await?;
1616        }
1617    }
1618    Ok(())
1619}
1620
1621async fn append_source_gap(
1622    inner: &AggregatorInner,
1623    runtime_key: &str,
1624    reason: String,
1625) -> ConsoleLogResult<()> {
1626    append_and_emit(
1627        inner,
1628        NewConsoleFrame {
1629            id: None,
1630            dedupe_key: format!("source-gap:{runtime_key}:{}", current_time_ms()),
1631            timestamp_ms: current_time_ms(),
1632            runtime_key: runtime_key.to_string(),
1633            identity: "__console__".to_string(),
1634            conversation_id: None,
1635            session_id: None,
1636            kind: "replay_unavailable".to_string(),
1637            status: ConsoleFrameStatus::DeliveryFailed,
1638            payload: json!({
1639                "reason": reason,
1640                "source_kind": "console_event",
1641            }),
1642            source: ConsoleFrameSource {
1643                kind: ConsoleFrameSourceKind::Synthetic,
1644                source_cursor: None,
1645            },
1646            source_event_id: None,
1647            interaction_id: None,
1648            turn_id: None,
1649            run_id: None,
1650            parent_frame_id: None,
1651            caused_by_frame_id: None,
1652        },
1653    )
1654    .await?;
1655    let _ = inner
1656        .event_tx
1657        .send(ConsoleTimelineEvent::ReplayUnavailable {
1658            requested_cursor: format!("source-gap:{runtime_key}"),
1659            latest_cursor: inner.store.latest_cursor().await.ok().flatten(),
1660        });
1661    Ok(())
1662}
1663
1664async fn append_backfill_gap(
1665    inner: &AggregatorInner,
1666    runtime_key: &str,
1667    identity: &str,
1668    reason: String,
1669) -> ConsoleLogResult<()> {
1670    append_and_emit(
1671        inner,
1672        NewConsoleFrame {
1673            id: None,
1674            dedupe_key: format!(
1675                "session-backfill-gap:{runtime_key}:{identity}:{}",
1676                current_time_ms()
1677            ),
1678            timestamp_ms: current_time_ms(),
1679            runtime_key: runtime_key.to_string(),
1680            identity: identity.to_string(),
1681            conversation_id: Some(identity.to_string()),
1682            session_id: None,
1683            kind: "replay_unavailable".to_string(),
1684            status: ConsoleFrameStatus::DeliveryFailed,
1685            payload: json!({
1686                "reason": reason,
1687                "source_kind": "session_history",
1688            }),
1689            source: ConsoleFrameSource {
1690                kind: ConsoleFrameSourceKind::Synthetic,
1691                source_cursor: None,
1692            },
1693            source_event_id: None,
1694            interaction_id: None,
1695            turn_id: None,
1696            run_id: None,
1697            parent_frame_id: None,
1698            caused_by_frame_id: None,
1699        },
1700    )
1701    .await?;
1702    Ok(())
1703}
1704
1705async fn project_console_event(
1706    inner: Arc<AggregatorInner>,
1707    runtime_key: &str,
1708    envelope: crate::console_contracts::ConsoleIdentityEventEnvelope,
1709) -> ConsoleLogResult<()> {
1710    let Some(entry) = inner
1711        .runtimes
1712        .read()
1713        .ok()
1714        .and_then(|entries| entries.get(runtime_key).cloned())
1715    else {
1716        return Ok(());
1717    };
1718    let mut frame = frame_from_console_event(&entry, envelope);
1719    if let Some(redacted) = entry.visibility_policy.redact_payload(&frame) {
1720        frame.payload = redacted;
1721        frame.status = ConsoleFrameStatus::Redacted;
1722    }
1723    let source_cursor = frame
1724        .source_event_id
1725        .clone()
1726        .unwrap_or_else(|| frame.dedupe_key.clone());
1727    let refresh_identity = if console_event_should_refresh_session_history(&frame) {
1728        Some(frame.identity.clone())
1729    } else {
1730        None
1731    };
1732    let opportunistic_refresh_identity = if refresh_identity.is_none()
1733        && console_event_should_start_session_history_backfill(&frame)
1734    {
1735        Some(frame.identity.clone())
1736    } else {
1737        None
1738    };
1739    append_and_emit(&inner, frame).await?;
1740    inner
1741        .store
1742        .record_source_watermark(
1743            &entry.runtime_key,
1744            ConsoleFrameSourceKind::ConsoleEvent,
1745            &source_cursor,
1746        )
1747        .await?;
1748    if let Some(identity) = refresh_identity {
1749        spawn_session_history_backfill_for_identity(inner.clone(), identity, true);
1750    } else if let Some(identity) = opportunistic_refresh_identity {
1751        spawn_opportunistic_session_history_backfill_for_identity(inner.clone(), identity);
1752    }
1753    Ok(())
1754}
1755
1756fn console_event_should_refresh_session_history(frame: &NewConsoleFrame) -> bool {
1757    matches!(
1758        frame.kind.as_str(),
1759        "interaction_complete" | "interaction_failed" | "message_delivery_failed"
1760    ) || frame.session_id.is_some()
1761}
1762
1763fn console_event_should_start_session_history_backfill(frame: &NewConsoleFrame) -> bool {
1764    if frame.identity == SYSTEM_EVENT_IDENTITY {
1765        return false;
1766    }
1767    matches!(
1768        frame.kind.as_str(),
1769        "turn_started"
1770            | "run_started"
1771            | "reasoning_complete"
1772            | "tool_call_requested"
1773            | "tool_call"
1774            | "tool_execution_started"
1775            | "text_delta"
1776            | "system_notice"
1777    )
1778}
1779
1780async fn append_and_emit(
1781    inner: &AggregatorInner,
1782    frame: NewConsoleFrame,
1783) -> ConsoleLogResult<AppendOutcome> {
1784    let outcome = inner.store.append_if_absent(frame).await?;
1785    if outcome.disposition == AppendDisposition::Inserted {
1786        let _ = inner.event_tx.send(ConsoleTimelineEvent::ConsoleFrame {
1787            frame: outcome.frame.clone(),
1788        });
1789    }
1790    Ok(outcome)
1791}
1792
1793async fn update_frame_status_and_emit(
1794    inner: &AggregatorInner,
1795    frame_id: &str,
1796    status: ConsoleFrameStatus,
1797) -> ConsoleLogResult<Option<ConsoleFrame>> {
1798    let Some(updated) = inner.store.update_frame_status(frame_id, status).await? else {
1799        return Ok(None);
1800    };
1801    let update_marker = NewConsoleFrame {
1802        id: None,
1803        dedupe_key: format!("frame-update:{}:{}", updated.id, updated.frame_version),
1804        timestamp_ms: updated.updated_at_ms.unwrap_or_else(current_time_ms),
1805        runtime_key: updated.runtime_key.clone(),
1806        identity: updated.identity.clone(),
1807        conversation_id: updated.conversation_id.clone(),
1808        session_id: updated.session_id.clone(),
1809        kind: "frame_updated".to_string(),
1810        status: updated.status,
1811        payload: json!({ "frame": updated.clone() }),
1812        source: ConsoleFrameSource {
1813            kind: ConsoleFrameSourceKind::Synthetic,
1814            source_cursor: None,
1815        },
1816        source_event_id: None,
1817        interaction_id: updated.interaction_id.clone(),
1818        turn_id: updated.turn_id.clone(),
1819        run_id: updated.run_id.clone(),
1820        parent_frame_id: Some(updated.id.clone()),
1821        caused_by_frame_id: Some(updated.id.clone()),
1822    };
1823    let outcome = inner.store.append_if_absent(update_marker).await?;
1824    if outcome.disposition == AppendDisposition::Inserted {
1825        let _ = inner.event_tx.send(ConsoleTimelineEvent::ConsoleFrame {
1826            frame: outcome.frame,
1827        });
1828    }
1829    Ok(Some(updated))
1830}
1831
1832fn frame_from_console_event(
1833    entry: &RuntimeEntry,
1834    envelope: crate::console_contracts::ConsoleIdentityEventEnvelope,
1835) -> NewConsoleFrame {
1836    let turn_id = envelope
1837        .data
1838        .get("turn_id")
1839        .and_then(Value::as_str)
1840        .map(ToString::to_string);
1841    let run_id = envelope
1842        .data
1843        .get("run_id")
1844        .and_then(Value::as_str)
1845        .map(ToString::to_string);
1846    let status = match envelope.event_type.as_str() {
1847        "interaction_started" => ConsoleFrameStatus::Accepted,
1848        "interaction_failed" | "run_failed" => ConsoleFrameStatus::DeliveryFailed,
1849        "interaction_complete" | "run_completed" => ConsoleFrameStatus::Completed,
1850        _ => ConsoleFrameStatus::Delivered,
1851    };
1852    let identity = apply_namespace(&envelope.identity, &entry.identity_namespace);
1853    NewConsoleFrame {
1854        id: Some(envelope.event_id.clone()),
1855        dedupe_key: format!("console-event:{}:{}", entry.runtime_key, envelope.event_id),
1856        timestamp_ms: envelope.timestamp_ms,
1857        runtime_key: entry.runtime_key.clone(),
1858        identity: identity.clone(),
1859        conversation_id: Some(identity),
1860        session_id: envelope
1861            .data
1862            .get("session_id")
1863            .and_then(Value::as_str)
1864            .map(ToString::to_string),
1865        kind: envelope.event_type,
1866        status,
1867        payload: envelope.data,
1868        source: ConsoleFrameSource {
1869            kind: ConsoleFrameSourceKind::ConsoleEvent,
1870            source_cursor: None,
1871        },
1872        source_event_id: Some(envelope.event_id),
1873        interaction_id: envelope.interaction_id,
1874        turn_id,
1875        run_id,
1876        parent_frame_id: None,
1877        caused_by_frame_id: None,
1878    }
1879}
1880
1881#[cfg(test)]
1882fn frame_from_session_history_message(
1883    runtime_key: &str,
1884    identity: &str,
1885    session_id: &str,
1886    offset: usize,
1887    message: Value,
1888) -> Option<NewConsoleFrame> {
1889    frames_from_session_history_message(runtime_key, identity, session_id, offset, message)
1890        .into_iter()
1891        .next()
1892}
1893
1894fn frames_from_session_history_message(
1895    runtime_key: &str,
1896    identity: &str,
1897    session_id: &str,
1898    offset: usize,
1899    message: Value,
1900) -> Vec<NewConsoleFrame> {
1901    let payload_hash = hash_short(&serde_json::to_string(&message).unwrap_or_default());
1902    let Some(parsed) = serde_json::from_value::<Message>(message.clone()).ok() else {
1903        return Vec::new();
1904    };
1905    if let Message::ToolResults {
1906        results,
1907        created_at,
1908    } = parsed
1909    {
1910        return results
1911            .into_iter()
1912            .enumerate()
1913            .map(|(idx, result)| {
1914                let content = serde_json::to_value(&result.content).unwrap_or(Value::Null);
1915                let result_text = result.text_content();
1916                let tool_use_id = result.tool_use_id.clone();
1917                NewConsoleFrame {
1918                    id: None,
1919                    dedupe_key: format!(
1920                        "session-history:{runtime_key}:{session_id}:{offset}:{idx}:{payload_hash}"
1921                    ),
1922                    timestamp_ms: created_at.timestamp_millis().max(0) as u64,
1923                    runtime_key: runtime_key.to_string(),
1924                    identity: identity.to_string(),
1925                    conversation_id: Some(identity.to_string()),
1926                    session_id: Some(session_id.to_string()),
1927                    kind: "tool_execution_completed".to_string(),
1928                    status: ConsoleFrameStatus::Completed,
1929                    payload: json!({
1930                        "id": tool_use_id,
1931                        "tool_call_id": tool_use_id,
1932                        "result": result_text,
1933                        "content": content,
1934                        "is_error": result.is_error,
1935                        "source_event_type": "session_history",
1936                        "type": "session_history",
1937                    }),
1938                    source: ConsoleFrameSource {
1939                        kind: ConsoleFrameSourceKind::SessionHistory,
1940                        source_cursor: Some(format!("{session_id}:{offset}:{idx}")),
1941                    },
1942                    source_event_id: None,
1943                    interaction_id: None,
1944                    turn_id: None,
1945                    run_id: None,
1946                    parent_frame_id: None,
1947                    caused_by_frame_id: None,
1948                }
1949            })
1950            .collect();
1951    }
1952    let (kind, timestamp_ms, payload) = match &parsed {
1953        Message::User(user) => {
1954            if session_history_user_message_is_scaffold(&message) {
1955                return Vec::new();
1956            }
1957            (
1958                "user_input",
1959                user.created_at.timestamp_millis().max(0) as u64,
1960                json!({
1961                    "content": user.content,
1962                    "message": message,
1963                }),
1964            )
1965        }
1966        Message::Assistant(assistant) => (
1967            "interaction_complete",
1968            assistant.created_at.timestamp_millis().max(0) as u64,
1969            json!({
1970                "result": assistant.content,
1971                "text": assistant.content,
1972                "message": message,
1973                "source_event_type": "session_history",
1974                "type": "session_history",
1975            }),
1976        ),
1977        Message::BlockAssistant(assistant) => {
1978            let text = assistant.text_blocks().collect::<Vec<_>>().join("");
1979            (
1980                "interaction_complete",
1981                assistant.created_at.timestamp_millis().max(0) as u64,
1982                json!({
1983                    "result": text,
1984                    "text": text,
1985                    "message": message,
1986                    "source_event_type": "session_history",
1987                    "type": "session_history",
1988                }),
1989            )
1990        }
1991        Message::SystemNotice(notice) => (
1992            "system_notice",
1993            notice.created_at.timestamp_millis().max(0) as u64,
1994            json!({
1995                "message": message,
1996                "kind": notice.kind,
1997                "render_class": notice.kind.render_class(),
1998                "body": notice.body,
1999                "blocks": notice.blocks,
2000                "source_event_type": "session_history",
2001                "type": "session_history",
2002            }),
2003        ),
2004        Message::System(_) | Message::ToolResults { .. } => return Vec::new(),
2005    };
2006    vec![NewConsoleFrame {
2007        id: None,
2008        dedupe_key: format!("session-history:{runtime_key}:{session_id}:{offset}:{payload_hash}"),
2009        timestamp_ms,
2010        runtime_key: runtime_key.to_string(),
2011        identity: identity.to_string(),
2012        conversation_id: Some(identity.to_string()),
2013        session_id: Some(session_id.to_string()),
2014        kind: kind.to_string(),
2015        status: ConsoleFrameStatus::Completed,
2016        payload,
2017        source: ConsoleFrameSource {
2018            kind: ConsoleFrameSourceKind::SessionHistory,
2019            source_cursor: Some(format!("{session_id}:{offset}")),
2020        },
2021        source_event_id: None,
2022        interaction_id: None,
2023        turn_id: None,
2024        run_id: None,
2025        parent_frame_id: None,
2026        caused_by_frame_id: None,
2027    }]
2028}
2029
2030fn session_history_user_message_is_scaffold(message: &Value) -> bool {
2031    message
2032        .get("content")
2033        .is_some_and(scaffold_message_content_is_noise)
2034}
2035
2036fn scaffold_message_content_is_noise(value: &Value) -> bool {
2037    match value {
2038        Value::String(text) => scaffold_message_text_is_noise(text),
2039        Value::Array(items) => items.iter().any(scaffold_message_content_is_noise),
2040        Value::Object(map) => ["text", "content", "message"]
2041            .iter()
2042            .filter_map(|key| map.get(*key))
2043            .any(scaffold_message_content_is_noise),
2044        _ => false,
2045    }
2046}
2047
2048fn scaffold_message_text_is_noise(text: &str) -> bool {
2049    let trimmed = text.trim_start();
2050    trimmed.starts_with("[PEER UPDATE]")
2051        || trimmed
2052            .to_ascii_lowercase()
2053            .starts_with("you have been spawned")
2054}
2055
2056fn parse_session_history_watermark(watermark: &str, session_id: &str) -> Option<usize> {
2057    let rest = watermark.strip_prefix(session_id)?.strip_prefix(':')?;
2058    rest.split(':').next()?.parse().ok()
2059}
2060
2061fn format_session_history_watermark(session_id: &str, offset: usize, checked_at_ms: u64) -> String {
2062    format!("{session_id}:{offset}:{checked_at_ms}")
2063}
2064
2065fn session_history_watermark_is_fresh(watermark: &str, session_id: &str, now_ms: u64) -> bool {
2066    let Some(checked_at_ms) = watermark
2067        .rsplit_once(':')
2068        .and_then(|(_, checked_at_ms)| checked_at_ms.parse::<u64>().ok())
2069    else {
2070        return false;
2071    };
2072    let offset = parse_session_history_watermark(watermark, session_id).unwrap_or(0);
2073    let ttl_ms = if offset > 0 {
2074        SESSION_HISTORY_GROWING_REFRESH_TTL_MS
2075    } else {
2076        SESSION_HISTORY_REFRESH_TTL_MS
2077    };
2078    now_ms.saturating_sub(checked_at_ms) < ttl_ms
2079}
2080
2081async fn history_frame_has_existing_counterpart(
2082    inner: &AggregatorInner,
2083    frame: &NewConsoleFrame,
2084) -> ConsoleLogResult<bool> {
2085    let fingerprint = transcript_fingerprint(&frame.kind, &frame.payload);
2086    let Some(fingerprint) = fingerprint else {
2087        return Ok(false);
2088    };
2089    let assistant_terminal = assistant_terminal_fingerprint(&frame.kind, &frame.payload).is_some();
2090    let mut delta_text_by_turn = BTreeMap::<String, String>::new();
2091    let mut after = None;
2092    loop {
2093        let page = inner
2094            .store
2095            .query_frames(ConsoleTimelineQuery {
2096                identity: Some(frame.identity.clone()),
2097                conversation_id: frame.conversation_id.clone(),
2098                after,
2099                limit: 1_000,
2100            })
2101            .await?;
2102        for existing in &page.frames {
2103            let same_session = existing.session_id == frame.session_id
2104                || existing.session_id.is_none()
2105                || frame.session_id.is_none();
2106            if existing.source.kind == ConsoleFrameSourceKind::SessionHistory || !same_session {
2107                continue;
2108            }
2109            if transcript_fingerprint(&existing.kind, &existing.payload).as_ref()
2110                == Some(&fingerprint)
2111            {
2112                return Ok(true);
2113            }
2114            if assistant_terminal
2115                && let Some(delta) = text_delta_payload_text(&existing.kind, &existing.payload)
2116            {
2117                let turn_key = existing
2118                    .interaction_id
2119                    .as_deref()
2120                    .or(existing.turn_id.as_deref())
2121                    .or(existing.run_id.as_deref())
2122                    .unwrap_or("session");
2123                let aggregated = delta_text_by_turn.entry(turn_key.to_string()).or_default();
2124                aggregated.push_str(delta);
2125                if normalize_transcript_fingerprint_text(aggregated) == fingerprint {
2126                    return Ok(true);
2127                }
2128            }
2129        }
2130        if page.frames.is_empty() || page.next_cursor.is_none() {
2131            return Ok(false);
2132        }
2133        after = page.next_cursor;
2134    }
2135}
2136
2137fn session_history_watermark_runtime_key(runtime_key: &str, session_id: &str) -> String {
2138    format!("{runtime_key}:session-history:{session_id}")
2139}
2140
2141fn transcript_fingerprint(kind: &str, payload: &Value) -> Option<String> {
2142    match kind {
2143        "user_input" | "interaction_started" => payload
2144            .get("content")
2145            .map(stable_value_fingerprint)
2146            .or_else(|| payload.get("message").map(stable_value_fingerprint)),
2147        "text_delta" => {
2148            text_delta_payload_text(kind, payload).map(normalize_transcript_fingerprint_text)
2149        }
2150        "text_complete" | "interaction_complete" | "run_completed" => {
2151            assistant_terminal_fingerprint(kind, payload)
2152        }
2153        _ => None,
2154    }
2155}
2156
2157fn assistant_terminal_fingerprint(kind: &str, payload: &Value) -> Option<String> {
2158    match kind {
2159        "text_complete" | "interaction_complete" | "run_completed" => payload
2160            .get("text")
2161            .or_else(|| payload.get("result"))
2162            .or_else(|| payload.get("content"))
2163            .map(stable_value_fingerprint),
2164        _ => None,
2165    }
2166}
2167
2168fn text_delta_payload_text<'a>(kind: &str, payload: &'a Value) -> Option<&'a str> {
2169    if kind != "text_delta" {
2170        return None;
2171    }
2172    payload
2173        .get("delta")
2174        .or_else(|| payload.get("text"))
2175        .or_else(|| payload.get("content"))
2176        .and_then(Value::as_str)
2177        .or_else(|| payload.as_str())
2178}
2179
2180fn stable_value_fingerprint(value: &Value) -> String {
2181    match value {
2182        Value::String(text) => normalize_transcript_fingerprint_text(text),
2183        other => serde_json::to_string(other).unwrap_or_default(),
2184    }
2185}
2186
2187fn normalize_transcript_fingerprint_text(text: &str) -> String {
2188    let trimmed = text.trim();
2189    trimmed
2190        .strip_prefix("[EVENT via rpc] ")
2191        .unwrap_or(trimmed)
2192        .trim()
2193        .to_string()
2194}
2195
2196#[derive(Debug, Clone, Copy)]
2197enum CachedIdentityVisibility {
2198    Visible,
2199    Hidden,
2200    Missing,
2201}
2202
2203async fn frame_is_visible(
2204    inner: &AggregatorInner,
2205    frame: &ConsoleFrame,
2206    allow_historical_identity: bool,
2207) -> ConsoleLogResult<bool> {
2208    let mut identity_visibility_cache = HashMap::new();
2209    frame_is_visible_cached(
2210        inner,
2211        frame,
2212        allow_historical_identity,
2213        &mut identity_visibility_cache,
2214    )
2215    .await
2216}
2217
2218async fn frame_is_visible_cached(
2219    inner: &AggregatorInner,
2220    frame: &ConsoleFrame,
2221    allow_historical_identity: bool,
2222    identity_visibility_cache: &mut HashMap<(String, String), CachedIdentityVisibility>,
2223) -> ConsoleLogResult<bool> {
2224    let entry = {
2225        let entries = inner
2226            .runtimes
2227            .read()
2228            .map_err(|_| runtime_registry_lock_error())?;
2229        if entries.is_empty() {
2230            return Ok(true);
2231        }
2232        let Some(entry) = entries.get(&frame.runtime_key) else {
2233            return Ok(false);
2234        };
2235        entry.clone()
2236    };
2237    if frame.identity != "__console__" {
2238        let cache_key = (frame.runtime_key.clone(), frame.identity.clone());
2239        let identity_visibility =
2240            if let Some(cached) = identity_visibility_cache.get(&cache_key).copied() {
2241                cached
2242            } else {
2243                let runtime_member_id = strip_namespace(&frame.identity, &entry.identity_namespace)
2244                    .unwrap_or_else(|| frame.identity.clone());
2245                let runtime_member = MeerkatId::from(runtime_member_id.as_str());
2246                let visibility = match member_sources_for_entry(&entry)
2247                    .await
2248                    .into_iter()
2249                    .find(|member| member.member.agent_identity == runtime_member)
2250                {
2251                    Some(resolved) => {
2252                        match identity_record_for_member(&entry, &resolved.handle, &resolved.member)
2253                            .await
2254                        {
2255                            Some(record) if entry.visibility_policy.identity_visible(&record) => {
2256                                CachedIdentityVisibility::Visible
2257                            }
2258                            Some(_) => CachedIdentityVisibility::Hidden,
2259                            None => CachedIdentityVisibility::Hidden,
2260                        }
2261                    }
2262                    None => CachedIdentityVisibility::Missing,
2263                };
2264                identity_visibility_cache.insert(cache_key, visibility);
2265                visibility
2266            };
2267        match identity_visibility {
2268            CachedIdentityVisibility::Visible => {}
2269            CachedIdentityVisibility::Hidden => return Ok(false),
2270            CachedIdentityVisibility::Missing => {
2271                return Ok(
2272                    allow_historical_identity && entry.visibility_policy.frame_visible(frame)
2273                );
2274            }
2275        }
2276    }
2277    Ok(entry.visibility_policy.frame_visible(frame))
2278}
2279
2280async fn identity_record_for_member(
2281    entry: &RuntimeEntry,
2282    handle: &MobHandle,
2283    member: &MobMemberListEntry,
2284) -> Option<ConsoleIdentityRecord> {
2285    let runtime_member_id = member.agent_identity.to_string();
2286    let durable_identity = member
2287        .labels
2288        .get("agent_identity")
2289        .filter(|value| !value.trim().is_empty())
2290        .map_or(runtime_member_id.as_str(), String::as_str);
2291    let identity = apply_namespace(durable_identity, &entry.identity_namespace);
2292    let addressable = member_is_addressable(member);
2293    let visibility = if member.state == meerkat_mob::MemberState::Retiring {
2294        ConsoleVisibility::RetiredReadable
2295    } else if addressable {
2296        ConsoleVisibility::Addressable
2297    } else {
2298        ConsoleVisibility::Hidden
2299    };
2300    let session_id = handle
2301        .resolve_bridge_session_id(&member.agent_identity)
2302        .await
2303        .map(|sid| sid.to_string());
2304    let display_name = member
2305        .labels
2306        .get("display_name")
2307        .cloned()
2308        .unwrap_or_else(|| runtime_member_id.clone());
2309    let mut labels = member.labels.clone();
2310    labels
2311        .entry("role".to_string())
2312        .or_insert_with(|| member.role.to_string());
2313    Some(ConsoleIdentityRecord {
2314        identity,
2315        display_name,
2316        runtime_key: entry.runtime_key.clone(),
2317        runtime_member_id,
2318        session_id,
2319        visibility,
2320        addressable,
2321        health: if addressable {
2322            "ready"
2323        } else {
2324            "hidden_by_policy"
2325        }
2326        .to_string(),
2327        labels,
2328    })
2329}
2330
2331pub(crate) fn is_implicit_delegate_member(
2332    role: &str,
2333    labels: &std::collections::BTreeMap<String, String>,
2334) -> bool {
2335    role.eq_ignore_ascii_case("delegate") && labels.contains_key("source_mob_id")
2336}
2337
2338fn member_is_addressable(member: &MobMemberListEntry) -> bool {
2339    member
2340        .labels
2341        .get("addressable")
2342        .map(|value| !value.eq_ignore_ascii_case("false"))
2343        .unwrap_or(true)
2344}
2345
2346fn apply_namespace(identity: &str, namespace: &str) -> String {
2347    let namespace = namespace.trim().trim_matches('/');
2348    if namespace.is_empty() {
2349        identity.to_string()
2350    } else {
2351        format!("{namespace}/{identity}")
2352    }
2353}
2354
2355fn strip_namespace(identity: &str, namespace: &str) -> Option<String> {
2356    let namespace = namespace.trim().trim_matches('/');
2357    if namespace.is_empty() {
2358        return Some(identity.to_string());
2359    }
2360    identity
2361        .strip_prefix(namespace)
2362        .and_then(|rest| rest.strip_prefix('/'))
2363        .map(ToString::to_string)
2364}
2365
2366fn validate_send_request(request: &ConsoleSendRequest) -> Result<(), ConsoleSendError> {
2367    if request.identity.trim().is_empty() {
2368        return Err(ConsoleSendError::InvalidRequest(
2369            "identity must be non-empty".to_string(),
2370        ));
2371    }
2372    if request.origin.trim().is_empty() {
2373        return Err(ConsoleSendError::InvalidRequest(
2374            "origin must be non-empty".to_string(),
2375        ));
2376    }
2377    if request.idempotency_key.trim().is_empty() {
2378        return Err(ConsoleSendError::InvalidRequest(
2379            "idempotency_key must be non-empty".to_string(),
2380        ));
2381    }
2382    Ok(())
2383}
2384
2385fn content_input_from_value(value: &Value) -> Result<ContentInput, ConsoleSendError> {
2386    let content: ContentInput = serde_json::from_value(value.clone())
2387        .map_err(|err| ConsoleSendError::InvalidContent(err.to_string()))?;
2388    match &content {
2389        ContentInput::Text(text) if text.trim().is_empty() => Err(
2390            ConsoleSendError::InvalidContent("content must be non-empty".to_string()),
2391        ),
2392        ContentInput::Blocks(blocks) if blocks.is_empty() => Err(ConsoleSendError::InvalidContent(
2393            "content blocks must be non-empty".to_string(),
2394        )),
2395        _ => Ok(content),
2396    }
2397}
2398
2399fn parse_handling_mode(
2400    value: Option<&str>,
2401) -> Result<meerkat_core::types::HandlingMode, ConsoleSendError> {
2402    match value.unwrap_or("queue") {
2403        "queue" => Ok(meerkat_core::types::HandlingMode::Queue),
2404        "steer" => Ok(meerkat_core::types::HandlingMode::Steer),
2405        other => Err(ConsoleSendError::InvalidHandlingMode(other.to_string())),
2406    }
2407}
2408
2409fn accepted_from_frame(frame: &ConsoleFrame) -> ConsoleInteractionAccepted {
2410    ConsoleInteractionAccepted {
2411        interaction_id: frame
2412            .interaction_id
2413            .clone()
2414            .unwrap_or_else(|| format!("console-interaction-{}", hash_short(&frame.dedupe_key))),
2415        identity: frame.identity.clone(),
2416        conversation_id: frame.conversation_id.clone(),
2417        session_id: frame.session_id.clone(),
2418        input_frame_id: frame.id.clone(),
2419        cursor: frame.cursor.clone(),
2420        status: frame.status,
2421    }
2422}
2423
2424fn send_dedupe_key(
2425    runtime_key: &str,
2426    identity: &str,
2427    origin: &str,
2428    idempotency_key: &str,
2429) -> String {
2430    format!("send:{runtime_key}:{identity}:{origin}:{idempotency_key}")
2431}
2432
2433fn send_request_fingerprint(origin: &str, content: &Value, handling_mode: &str) -> String {
2434    let content_json = serde_json::to_string(content).unwrap_or_default();
2435    hash_short(&format!("{origin}\n{handling_mode}\n{content_json}"))
2436}
2437
2438fn hash_short(value: &str) -> String {
2439    let mut hasher = Sha256::new();
2440    hasher.update(value.as_bytes());
2441    let digest = hasher.finalize();
2442    to_hex(&digest[..8])
2443}
2444
2445fn to_hex(bytes: &[u8]) -> String {
2446    const HEX: &[u8; 16] = b"0123456789abcdef";
2447    let mut out = String::with_capacity(bytes.len() * 2);
2448    for byte in bytes {
2449        out.push(HEX[(byte >> 4) as usize] as char);
2450        out.push(HEX[(byte & 0x0f) as usize] as char);
2451    }
2452    out
2453}
2454
2455fn current_time_ms() -> u64 {
2456    match SystemTime::now().duration_since(UNIX_EPOCH) {
2457        Ok(duration) => duration.as_millis() as u64,
2458        Err(_) => 0,
2459    }
2460}
2461
2462fn runtime_registry_lock_error() -> ConsoleLogError {
2463    Box::new(std::io::Error::other(
2464        "console runtime registry lock poisoned",
2465    ))
2466}
2467
2468#[cfg(test)]
2469#[allow(clippy::expect_used)]
2470mod tests {
2471    use std::sync::Arc;
2472    use std::sync::atomic::{AtomicUsize, Ordering};
2473    use std::time::Duration;
2474    use std::time::Instant;
2475
2476    use futures::StreamExt;
2477    use meerkat::{AgentFactory, Config, build_ephemeral_service};
2478    use meerkat_client::types::LlmStream;
2479    use meerkat_client::{LlmClient, LlmDoneOutcome, LlmError, LlmEvent, LlmRequest, TestClient};
2480    use meerkat_core::{
2481        AppendSystemContextRequest, AppendSystemContextResult, CommsRuntime, EventStream,
2482        RunResult, SessionControlError, SessionError, SessionHistoryPage, SessionHistoryQuery,
2483        SessionId, SessionQuery, SessionService, SessionServiceCommsExt, SessionServiceControlExt,
2484        SessionServiceHistoryExt, SessionSummary, SessionView, StartTurnRequest, StopReason,
2485        StreamError,
2486    };
2487    use meerkat_mob::{MobDefinition, MobSessionService, MobStorage, SpawnMemberSpec};
2488    use serde_json::json;
2489
2490    use super::*;
2491    use crate::mob_handle_runtime::MobBootstrapSpec;
2492
2493    struct CountingConsoleLogStore {
2494        inner: InMemoryConsoleLogStore,
2495        source_watermark_calls: AtomicUsize,
2496        record_watermark_calls: AtomicUsize,
2497    }
2498
2499    impl CountingConsoleLogStore {
2500        fn new() -> Self {
2501            Self {
2502                inner: InMemoryConsoleLogStore::new(),
2503                source_watermark_calls: AtomicUsize::new(0),
2504                record_watermark_calls: AtomicUsize::new(0),
2505            }
2506        }
2507
2508        fn source_watermark_calls(&self) -> usize {
2509            self.source_watermark_calls.load(Ordering::SeqCst)
2510        }
2511    }
2512
2513    struct SlowTestClient {
2514        delay: Duration,
2515    }
2516
2517    #[async_trait::async_trait]
2518    impl LlmClient for SlowTestClient {
2519        fn project_replay_messages(&self, messages: &[Message]) -> Result<Vec<Message>, LlmError> {
2520            Ok(messages.to_vec())
2521        }
2522
2523        fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
2524            let delay = self.delay;
2525            let delayed_text = futures::stream::once(async move {
2526                tokio::time::sleep(delay).await;
2527                Ok(LlmEvent::TextDelta {
2528                    delta: "slow ok".to_string(),
2529                    meta: None,
2530                })
2531            });
2532            let done = futures::stream::once(async {
2533                Ok(LlmEvent::Done {
2534                    outcome: LlmDoneOutcome::Success {
2535                        stop_reason: StopReason::EndTurn,
2536                    },
2537                })
2538            });
2539            Box::pin(delayed_text.chain(done))
2540        }
2541
2542        fn provider(&self) -> &'static str {
2543            "slow-test"
2544        }
2545
2546        async fn health_check(&self) -> Result<(), LlmError> {
2547            Ok(())
2548        }
2549    }
2550
2551    #[derive(Clone)]
2552    struct DelayedHistorySessionService {
2553        inner: Arc<dyn MobSessionService>,
2554        delay: Duration,
2555        read_calls: Arc<AtomicUsize>,
2556        active_reads: Arc<AtomicUsize>,
2557        max_active_reads: Arc<AtomicUsize>,
2558    }
2559
2560    impl DelayedHistorySessionService {
2561        fn new(inner: Arc<dyn MobSessionService>, delay: Duration) -> Self {
2562            Self {
2563                inner,
2564                delay,
2565                read_calls: Arc::new(AtomicUsize::new(0)),
2566                active_reads: Arc::new(AtomicUsize::new(0)),
2567                max_active_reads: Arc::new(AtomicUsize::new(0)),
2568            }
2569        }
2570
2571        fn read_calls(&self) -> usize {
2572            self.read_calls.load(Ordering::SeqCst)
2573        }
2574
2575        fn max_active_reads(&self) -> usize {
2576            self.max_active_reads.load(Ordering::SeqCst)
2577        }
2578    }
2579
2580    #[async_trait::async_trait]
2581    impl SessionService for DelayedHistorySessionService {
2582        async fn create_session(
2583            &self,
2584            req: meerkat_core::CreateSessionRequest,
2585        ) -> Result<RunResult, SessionError> {
2586            self.inner.create_session(req).await
2587        }
2588
2589        async fn start_turn(
2590            &self,
2591            id: &SessionId,
2592            req: StartTurnRequest,
2593        ) -> Result<RunResult, SessionError> {
2594            self.inner.start_turn(id, req).await
2595        }
2596
2597        async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
2598            self.inner.interrupt(id).await
2599        }
2600
2601        async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
2602            self.inner.read(id).await
2603        }
2604
2605        async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
2606            self.inner.list(query).await
2607        }
2608
2609        async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
2610            self.inner.archive(id).await
2611        }
2612
2613        async fn subscribe_session_events(
2614            &self,
2615            id: &SessionId,
2616        ) -> Result<EventStream, StreamError> {
2617            SessionService::subscribe_session_events(self.inner.as_ref(), id).await
2618        }
2619    }
2620
2621    #[async_trait::async_trait]
2622    impl SessionServiceCommsExt for DelayedHistorySessionService {
2623        async fn comms_runtime(&self, session_id: &SessionId) -> Option<Arc<dyn CommsRuntime>> {
2624            self.inner.comms_runtime(session_id).await
2625        }
2626    }
2627
2628    #[async_trait::async_trait]
2629    impl SessionServiceControlExt for DelayedHistorySessionService {
2630        async fn append_system_context(
2631            &self,
2632            id: &SessionId,
2633            req: AppendSystemContextRequest,
2634        ) -> Result<AppendSystemContextResult, SessionControlError> {
2635            self.inner.append_system_context(id, req).await
2636        }
2637    }
2638
2639    #[async_trait::async_trait]
2640    impl SessionServiceHistoryExt for DelayedHistorySessionService {
2641        async fn read_history(
2642            &self,
2643            id: &SessionId,
2644            query: SessionHistoryQuery,
2645        ) -> Result<SessionHistoryPage, SessionError> {
2646            self.read_calls.fetch_add(1, Ordering::SeqCst);
2647            let active = self.active_reads.fetch_add(1, Ordering::SeqCst) + 1;
2648            self.max_active_reads.fetch_max(active, Ordering::SeqCst);
2649            tokio::time::sleep(self.delay).await;
2650            let result = self.inner.read_history(id, query).await;
2651            self.active_reads.fetch_sub(1, Ordering::SeqCst);
2652            result
2653        }
2654    }
2655
2656    #[async_trait::async_trait]
2657    impl MobSessionService for DelayedHistorySessionService {
2658        fn supports_persistent_sessions(&self) -> bool {
2659            self.inner.supports_persistent_sessions()
2660        }
2661
2662        fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
2663            self.inner.runtime_adapter()
2664        }
2665
2666        async fn session_belongs_to_mob(
2667            &self,
2668            session_id: &SessionId,
2669            mob_id: &meerkat_mob::MobId,
2670        ) -> bool {
2671            self.inner.session_belongs_to_mob(session_id, mob_id).await
2672        }
2673
2674        async fn cancel_all_checkpointers(&self) {
2675            self.inner.cancel_all_checkpointers().await;
2676        }
2677
2678        async fn rearm_all_checkpointers(&self) {
2679            self.inner.rearm_all_checkpointers().await;
2680        }
2681    }
2682
2683    #[async_trait::async_trait]
2684    impl ConsoleLogStore for CountingConsoleLogStore {
2685        async fn append_if_absent(
2686            &self,
2687            frame: NewConsoleFrame,
2688        ) -> ConsoleLogResult<AppendOutcome> {
2689            self.inner.append_if_absent(frame).await
2690        }
2691
2692        async fn update_frame_status(
2693            &self,
2694            frame_id: &str,
2695            status: ConsoleFrameStatus,
2696        ) -> ConsoleLogResult<Option<ConsoleFrame>> {
2697            self.inner.update_frame_status(frame_id, status).await
2698        }
2699
2700        async fn query_frames(
2701            &self,
2702            query: ConsoleTimelineQuery,
2703        ) -> ConsoleLogResult<ConsoleTimelinePage> {
2704            self.inner.query_frames(query).await
2705        }
2706
2707        async fn frame_by_dedupe_key(
2708            &self,
2709            dedupe_key: &str,
2710        ) -> ConsoleLogResult<Option<ConsoleFrame>> {
2711            self.inner.frame_by_dedupe_key(dedupe_key).await
2712        }
2713
2714        async fn latest_cursor(&self) -> ConsoleLogResult<Option<ConsoleCursor>> {
2715            self.inner.latest_cursor().await
2716        }
2717
2718        async fn clear_frames(&self) -> ConsoleLogResult<()> {
2719            self.inner.clear_frames().await
2720        }
2721
2722        async fn record_source_watermark(
2723            &self,
2724            runtime_key: &str,
2725            source_kind: ConsoleFrameSourceKind,
2726            source_cursor: &str,
2727        ) -> ConsoleLogResult<()> {
2728            self.record_watermark_calls.fetch_add(1, Ordering::SeqCst);
2729            self.inner
2730                .record_source_watermark(runtime_key, source_kind, source_cursor)
2731                .await
2732        }
2733
2734        async fn source_watermark(
2735            &self,
2736            runtime_key: &str,
2737            source_kind: ConsoleFrameSourceKind,
2738        ) -> ConsoleLogResult<Option<String>> {
2739            self.source_watermark_calls.fetch_add(1, Ordering::SeqCst);
2740            self.inner.source_watermark(runtime_key, source_kind).await
2741        }
2742    }
2743
2744    async fn build_single_member_runtime() -> UnifiedRuntime {
2745        build_single_member_runtime_with_client(Arc::new(TestClient::default())).await
2746    }
2747
2748    async fn build_single_member_runtime_with_client(client: Arc<dyn LlmClient>) -> UnifiedRuntime {
2749        let definition = MobDefinition::from_toml(
2750            r#"
2751[mob]
2752id = "console-aggregator-perf-test"
2753
2754[profiles.worker]
2755model = "gpt-5.5"
2756external_addressable = true
2757
2758[profiles.worker.tools]
2759comms = true
2760"#,
2761        )
2762        .expect("definition parses");
2763        let runtime = UnifiedRuntime::builder()
2764            .definition(definition)
2765            .default_llm_client(client)
2766            .build()
2767            .await
2768            .expect("runtime builds");
2769        runtime
2770            .spawn(SpawnMemberSpec::from_wire(
2771                "worker".to_string(),
2772                "agent-a".to_string(),
2773                Some("You are agent-a.".into()),
2774                None,
2775                None,
2776            ))
2777            .await
2778            .expect("member spawns");
2779        runtime
2780    }
2781
2782    async fn build_empty_runtime(mob_id: &str) -> UnifiedRuntime {
2783        let definition = MobDefinition::from_toml(&format!(
2784            r#"
2785[mob]
2786id = "{mob_id}"
2787
2788[profiles.worker]
2789model = "gpt-5.5"
2790external_addressable = true
2791
2792[profiles.worker.tools]
2793comms = true
2794"#
2795        ))
2796        .expect("definition parses");
2797        UnifiedRuntime::builder()
2798            .definition(definition)
2799            .default_llm_client(Arc::new(TestClient::default()))
2800            .build()
2801            .await
2802            .expect("runtime builds")
2803    }
2804
2805    async fn build_stress_runtime(
2806        member_count: usize,
2807        history_delay: Duration,
2808    ) -> (
2809        tempfile::TempDir,
2810        Arc<UnifiedRuntime>,
2811        DelayedHistorySessionService,
2812    ) {
2813        let temp_dir = tempfile::tempdir().expect("temp dir");
2814        let session_path = temp_dir.path().join("sessions");
2815        std::fs::create_dir_all(&session_path).expect("session path");
2816        let factory = AgentFactory::new(&session_path).comms(true);
2817        let base_service = Arc::new(build_ephemeral_service(
2818            factory,
2819            Config::default(),
2820            member_count + 8,
2821        ));
2822        let delayed_service = DelayedHistorySessionService::new(base_service, history_delay);
2823        let session_service: Arc<dyn MobSessionService> = Arc::new(delayed_service.clone());
2824        let definition = MobDefinition::from_toml(
2825            r#"
2826[mob]
2827id = "console-aggregator-stress-test"
2828
2829[profiles.worker]
2830model = "gpt-5.5"
2831external_addressable = true
2832
2833[profiles.worker.tools]
2834comms = true
2835"#,
2836        )
2837        .expect("definition parses");
2838        let spec = MobBootstrapSpec::new(definition, MobStorage::in_memory(), session_service)
2839            .with_options(crate::mob_handle_runtime::MobBootstrapOptions {
2840                allow_ephemeral_sessions: true,
2841                notify_orchestrator_on_resume: true,
2842                default_llm_client: Some(Arc::new(TestClient::default())),
2843            });
2844        let runtime = Arc::new(
2845            UnifiedRuntime::bootstrap(
2846                spec,
2847                crate::types::MobKitConfig {
2848                    modules: Vec::new(),
2849                    discovery: crate::types::DiscoverySpec {
2850                        namespace: "stress".to_string(),
2851                        modules: Vec::new(),
2852                    },
2853                    pre_spawn: Vec::new(),
2854                },
2855                Duration::from_secs(5),
2856            )
2857            .await
2858            .expect("runtime boots"),
2859        );
2860        for idx in 0..member_count {
2861            runtime
2862                .spawn(SpawnMemberSpec::from_wire(
2863                    "worker".to_string(),
2864                    format!("agent-{idx}"),
2865                    Some(format!("You are agent-{idx}.").into()),
2866                    None,
2867                    None,
2868                ))
2869                .await
2870                .expect("member spawns");
2871        }
2872        (temp_dir, runtime, delayed_service)
2873    }
2874
2875    fn runtime_entry_for_test(runtime_key: &str, runtime: &UnifiedRuntime) -> RuntimeEntry {
2876        RuntimeEntry {
2877            runtime_key: runtime_key.to_string(),
2878            identity_namespace: "test".to_string(),
2879            runtime: runtime.mob_runtime().clone(),
2880            console_events: runtime.console_events(),
2881            visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
2882        }
2883    }
2884
2885    fn identity_record_for_test(identity: &str) -> ConsoleIdentityRecord {
2886        ConsoleIdentityRecord {
2887            identity: identity.to_string(),
2888            display_name: identity.to_string(),
2889            runtime_key: "runtime-cache".to_string(),
2890            runtime_member_id: identity.to_string(),
2891            session_id: Some(format!("session-{identity}")),
2892            visibility: ConsoleVisibility::Addressable,
2893            addressable: true,
2894            health: "ready".to_string(),
2895            labels: BTreeMap::new(),
2896        }
2897    }
2898
2899    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2900    async fn identity_record_uses_durable_agent_identity_label_for_identity_first_members()
2901    -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2902        let runtime = build_empty_runtime("identity-label-test").await;
2903        let mut labels = BTreeMap::new();
2904        labels.insert(
2905            "agent_identity".to_string(),
2906            "channel:C0SMOKEOB3".to_string(),
2907        );
2908        labels.insert("display_name".to_string(), "C0SMOKEOB3".to_string());
2909        runtime
2910            .spawn(
2911                SpawnMemberSpec::from_wire(
2912                    "worker".to_string(),
2913                    "rt:channel:C0SMOKEOB3:0".to_string(),
2914                    Some("You are C0SMOKEOB3.".into()),
2915                    None,
2916                    None,
2917                )
2918                .with_labels(labels),
2919            )
2920            .await
2921            .expect("member spawns");
2922
2923        let entry = RuntimeEntry {
2924            runtime_key: "runtime-a".to_string(),
2925            identity_namespace: String::new(),
2926            runtime: runtime.mob_runtime().clone(),
2927            console_events: runtime.console_events(),
2928            visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
2929        };
2930        let aggregator = MobKitConsoleAggregator::in_memory();
2931        aggregator
2932            .inner
2933            .runtimes
2934            .write()
2935            .expect("runtime registry")
2936            .insert("runtime-a".to_string(), entry);
2937
2938        let records = aggregator.list_identities_fresh().await?;
2939        let record = records
2940            .iter()
2941            .find(|record| record.identity == "channel:C0SMOKEOB3")
2942            .expect("durable identity is exposed");
2943        assert_eq!(record.runtime_member_id, "rt:channel:C0SMOKEOB3:0");
2944
2945        let inspection = aggregator
2946            .inspect_identity("channel:C0SMOKEOB3")
2947            .await?
2948            .expect("durable identity resolves back to runtime member");
2949        assert_eq!(inspection.identity.identity, "channel:C0SMOKEOB3");
2950        assert_eq!(
2951            inspection.identity.runtime_member_id,
2952            "rt:channel:C0SMOKEOB3:0"
2953        );
2954
2955        let _ = runtime.mob_handle().stop().await;
2956        Ok(())
2957    }
2958
2959    #[tokio::test]
2960    async fn list_identities_serves_hot_cache_while_identity_refresh_is_in_flight() {
2961        let aggregator = MobKitConsoleAggregator::in_memory();
2962        let record = identity_record_for_test("agent-cached");
2963        *aggregator.inner.identity_read_model.inner.write().await = vec![record.clone()];
2964        aggregator
2965            .inner
2966            .identity_read_model
2967            .primed
2968            .store(true, Ordering::Release);
2969        let _guard = aggregator
2970            .inner
2971            .identity_read_model
2972            .refresh_lock
2973            .clone()
2974            .lock_owned()
2975            .await;
2976
2977        let identities =
2978            tokio::time::timeout(Duration::from_millis(50), aggregator.list_identities())
2979                .await
2980                .expect("hot identity list should not wait for refresh lock")
2981                .expect("identity list succeeds");
2982
2983        assert_eq!(identities, vec![record]);
2984    }
2985
2986    #[tokio::test]
2987    async fn list_identities_waits_for_inflight_identity_refresh_on_cold_cache() {
2988        let aggregator = MobKitConsoleAggregator::in_memory();
2989        let guard = aggregator
2990            .inner
2991            .identity_read_model
2992            .refresh_lock
2993            .clone()
2994            .lock_owned()
2995            .await;
2996        let waiter = tokio::spawn({
2997            let aggregator = aggregator.clone();
2998            async move { aggregator.list_identities().await }
2999        });
3000
3001        tokio::time::sleep(Duration::from_millis(20)).await;
3002        assert!(
3003            !waiter.is_finished(),
3004            "cold identity list should wait for the in-flight refresh to finish"
3005        );
3006
3007        let record = identity_record_for_test("agent-primed");
3008        *aggregator.inner.identity_read_model.inner.write().await = vec![record.clone()];
3009        aggregator
3010            .inner
3011            .identity_read_model
3012            .primed
3013            .store(true, Ordering::Release);
3014        drop(guard);
3015
3016        let identities = tokio::time::timeout(Duration::from_secs(1), waiter)
3017            .await
3018            .expect("cold identity list waiter should resume")
3019            .expect("waiter joins")
3020            .expect("identity list succeeds");
3021        assert_eq!(identities, vec![record]);
3022    }
3023
3024    #[tokio::test]
3025    async fn query_timeline_reads_from_aggregate_store() {
3026        let aggregator = MobKitConsoleAggregator::in_memory();
3027        let frame = NewConsoleFrame {
3028            id: None,
3029            dedupe_key: "event-1".to_string(),
3030            timestamp_ms: 1,
3031            runtime_key: "runtime-a".to_string(),
3032            identity: "agent-a".to_string(),
3033            conversation_id: Some("agent-a".to_string()),
3034            session_id: None,
3035            kind: "text_delta".to_string(),
3036            status: ConsoleFrameStatus::Delivered,
3037            payload: json!({ "delta": "hello" }),
3038            source: ConsoleFrameSource {
3039                kind: ConsoleFrameSourceKind::ConsoleEvent,
3040                source_cursor: None,
3041            },
3042            source_event_id: Some("event-1".to_string()),
3043            interaction_id: None,
3044            turn_id: None,
3045            run_id: None,
3046            parent_frame_id: None,
3047            caused_by_frame_id: None,
3048        };
3049        aggregator
3050            .store()
3051            .append_if_absent(frame)
3052            .await
3053            .expect("append frame");
3054
3055        let page = aggregator
3056            .query_timeline(ConsoleTimelineQuery {
3057                identity: Some("agent-a".to_string()),
3058                limit: 10,
3059                ..ConsoleTimelineQuery::default()
3060            })
3061            .await
3062            .expect("query timeline");
3063        assert_eq!(page.frames.len(), 1);
3064        assert_eq!(page.frames[0].kind, "text_delta");
3065    }
3066
3067    #[tokio::test]
3068    async fn query_timeline_is_store_local_for_registered_runtimes() {
3069        let store = Arc::new(CountingConsoleLogStore::new());
3070        let aggregator = MobKitConsoleAggregator::new(store.clone());
3071        let runtime = build_single_member_runtime().await;
3072        aggregator
3073            .inner
3074            .runtimes
3075            .write()
3076            .expect("runtime registry")
3077            .insert(
3078                "runtime-a".to_string(),
3079                runtime_entry_for_test("runtime-a", &runtime),
3080            );
3081        store
3082            .append_if_absent(NewConsoleFrame {
3083                id: None,
3084                dedupe_key: "event-1".to_string(),
3085                timestamp_ms: 1,
3086                runtime_key: "runtime-a".to_string(),
3087                identity: "agent-a".to_string(),
3088                conversation_id: Some("agent-a".to_string()),
3089                session_id: None,
3090                kind: "text_delta".to_string(),
3091                status: ConsoleFrameStatus::Delivered,
3092                payload: json!({ "delta": "hello" }),
3093                source: ConsoleFrameSource {
3094                    kind: ConsoleFrameSourceKind::ConsoleEvent,
3095                    source_cursor: None,
3096                },
3097                source_event_id: Some("event-1".to_string()),
3098                interaction_id: None,
3099                turn_id: None,
3100                run_id: None,
3101                parent_frame_id: None,
3102                caused_by_frame_id: None,
3103            })
3104            .await
3105            .expect("append frame");
3106
3107        let page = tokio::time::timeout(
3108            Duration::from_millis(250),
3109            aggregator.query_timeline(ConsoleTimelineQuery {
3110                identity: Some("agent-a".to_string()),
3111                limit: 10,
3112                ..ConsoleTimelineQuery::default()
3113            }),
3114        )
3115        .await
3116        .expect("timeline query should not wait for session history")
3117        .expect("timeline query succeeds");
3118
3119        assert_eq!(page.frames.len(), 1);
3120        assert_eq!(
3121            store.source_watermark_calls(),
3122            0,
3123            "query_timeline must not synchronously touch session-history watermarks"
3124        );
3125        let _ = runtime.mob_handle().stop().await;
3126    }
3127
3128    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3129    async fn console_send_returns_after_acceptance_without_waiting_for_turn_completion() {
3130        let runtime = build_single_member_runtime_with_client(Arc::new(SlowTestClient {
3131            delay: Duration::from_secs(2),
3132        }))
3133        .await;
3134        let aggregator = MobKitConsoleAggregator::in_memory();
3135        aggregator
3136            .inner
3137            .runtimes
3138            .write()
3139            .expect("runtime registry")
3140            .insert(
3141                "runtime-a".to_string(),
3142                runtime_entry_for_test("runtime-a", &runtime),
3143            );
3144
3145        let start = Instant::now();
3146        let accepted = tokio::time::timeout(
3147            Duration::from_millis(300),
3148            aggregator.send(ConsoleSendRequest {
3149                identity: "test/agent-a".to_string(),
3150                content: json!("hello slow agent"),
3151                origin: "console:test".to_string(),
3152                idempotency_key: "nonblocking-send".to_string(),
3153                handling_mode: Some("queue".to_string()),
3154            }),
3155        )
3156        .await
3157        .expect("console send should return once the input is accepted")
3158        .expect("send succeeds");
3159
3160        assert_eq!(accepted.status, ConsoleFrameStatus::Accepted);
3161        assert!(
3162            start.elapsed() < Duration::from_secs(1),
3163            "console send should not wait for the delayed LLM turn"
3164        );
3165
3166        wait_for_session_history_text(
3167            &aggregator,
3168            "test/agent-a",
3169            "slow ok",
3170            Duration::from_secs(5),
3171        )
3172        .await
3173        .expect("background dispatch should still complete and project history");
3174        let _ = runtime.mob_handle().stop().await;
3175    }
3176
3177    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3178    async fn discovered_late_member_session_backfills_without_manual_refresh() -> Result<(), String>
3179    {
3180        let runtime = Arc::new(build_empty_runtime("console-aggregator-late-member-test").await);
3181        let aggregator = MobKitConsoleAggregator::in_memory();
3182        aggregator.register_runtime(ConsoleRuntimeRegistration {
3183            runtime_key: "runtime-late".to_string(),
3184            runtime: runtime.clone(),
3185            identity_namespace: "late".to_string(),
3186            visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
3187        });
3188
3189        runtime
3190            .spawn(SpawnMemberSpec::from_wire(
3191                "worker".to_string(),
3192                "agent-late".to_string(),
3193                Some("You are agent-late.".into()),
3194                None,
3195                None,
3196            ))
3197            .await
3198            .expect("late member spawns");
3199        let session_id = send_message_on_mob_with_mode(
3200            &runtime.mob_handle(),
3201            "agent-late",
3202            ContentInput::Text("hello after registration".to_string()),
3203            meerkat_core::types::HandlingMode::Queue,
3204        )
3205        .await
3206        .expect("direct member send succeeds");
3207        wait_for_identity_record(
3208            &aggregator,
3209            "late/agent-late",
3210            Some(session_id.as_str()),
3211            Duration::from_secs(5),
3212        )
3213        .await?;
3214
3215        wait_for_session_history_text(
3216            &aggregator,
3217            "late/agent-late",
3218            "You are agent-late.",
3219            Duration::from_secs(5),
3220        )
3221        .await?;
3222        let _ = runtime.mob_handle().stop().await;
3223        Ok(())
3224    }
3225
3226    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3227    async fn explicit_empty_identity_query_force_refreshes_past_fresh_watermark()
3228    -> Result<(), String> {
3229        let runtime = build_single_member_runtime().await;
3230        let entry = runtime_entry_for_test("runtime-a", &runtime);
3231        let resolved = member_sources_for_entry(&entry)
3232            .await
3233            .into_iter()
3234            .find(|candidate| candidate.member.agent_identity.as_str() == "agent-a")
3235            .expect("agent-a member exists");
3236        let record = identity_record_for_member(&entry, &resolved.handle, &resolved.member)
3237            .await
3238            .expect("identity record exists");
3239        let session_id = record.session_id.expect("agent-a has a session");
3240
3241        let aggregator = MobKitConsoleAggregator::in_memory();
3242        aggregator
3243            .inner
3244            .runtimes
3245            .write()
3246            .expect("runtime registry")
3247            .insert("runtime-a".to_string(), entry);
3248        let watermark_key = session_history_watermark_runtime_key("runtime-a", &session_id);
3249        aggregator
3250            .store()
3251            .record_source_watermark(
3252                &watermark_key,
3253                ConsoleFrameSourceKind::SessionHistory,
3254                &format_session_history_watermark(&session_id, 0, current_time_ms()),
3255            )
3256            .await
3257            .expect("record fresh empty watermark");
3258
3259        let page = tokio::time::timeout(
3260            Duration::from_secs(2),
3261            aggregator.query_timeline(ConsoleTimelineQuery {
3262                identity: Some("test/agent-a".to_string()),
3263                limit: 20,
3264                ..ConsoleTimelineQuery::default()
3265            }),
3266        )
3267        .await
3268        .expect("explicit identity query should not stall")
3269        .expect("query succeeds");
3270
3271        assert!(
3272            page.frames.iter().any(|frame| {
3273                frame.source.kind == ConsoleFrameSourceKind::SessionHistory
3274                    && frame.kind == "user_input"
3275                    && session_history_content_text(frame).as_deref() == Some("You are agent-a.")
3276            }),
3277            "explicit identity query should force-refresh stale/fresh empty watermarks; frames: {:#?}",
3278            page.frames
3279        );
3280        let _ = runtime.mob_handle().stop().await;
3281        Ok(())
3282    }
3283
3284    async fn wait_for_session_history_text(
3285        aggregator: &MobKitConsoleAggregator,
3286        identity: &str,
3287        expected: &str,
3288        timeout: Duration,
3289    ) -> Result<(), String> {
3290        let deadline = Instant::now() + timeout;
3291        let mut observed = Vec::new();
3292        while Instant::now() < deadline {
3293            let _ = aggregator.list_identities().await;
3294            let page = aggregator
3295                .query_timeline(ConsoleTimelineQuery {
3296                    identity: Some(identity.to_string()),
3297                    limit: 20,
3298                    ..ConsoleTimelineQuery::default()
3299                })
3300                .await
3301                .expect("query timeline");
3302            observed = page.frames;
3303            if observed.iter().any(|frame| {
3304                frame.source.kind == ConsoleFrameSourceKind::SessionHistory
3305                    && (frame.kind == "user_input" || frame.kind == "interaction_complete")
3306                    && session_history_content_text(frame).as_deref() == Some(expected)
3307            }) {
3308                return Ok(());
3309            }
3310            tokio::time::sleep(Duration::from_millis(25)).await;
3311        }
3312
3313        Err(format!(
3314            "session history text {expected:?} was not backfilled; observed frames: {observed:#?}",
3315        ))
3316    }
3317
3318    async fn wait_for_identity_record(
3319        aggregator: &MobKitConsoleAggregator,
3320        identity: &str,
3321        session_id: Option<&str>,
3322        timeout: Duration,
3323    ) -> Result<(), String> {
3324        let deadline = Instant::now() + timeout;
3325        let mut observed = Vec::new();
3326        while Instant::now() < deadline {
3327            observed = aggregator
3328                .list_identities()
3329                .await
3330                .map_err(|err| err.to_string())?;
3331            if observed.iter().any(|record| {
3332                record.identity == identity && record.session_id.as_deref() == session_id
3333            }) {
3334                return Ok(());
3335            }
3336            tokio::time::sleep(Duration::from_millis(25)).await;
3337        }
3338
3339        Err(format!(
3340            "identity {identity:?} with session {session_id:?} was not projected; observed identities: {observed:#?}",
3341        ))
3342    }
3343
3344    fn session_history_content_text(frame: &ConsoleFrame) -> Option<String> {
3345        if let Some(text) = frame.payload.get("text").and_then(Value::as_str) {
3346            return Some(text.to_string());
3347        }
3348        if let Some(text) = frame.payload.get("result").and_then(Value::as_str) {
3349            return Some(text.to_string());
3350        }
3351        match frame.payload.get("content")? {
3352            Value::String(text) => Some(text.clone()),
3353            Value::Array(blocks) => Some(
3354                blocks
3355                    .iter()
3356                    .filter_map(|block| block.get("text").and_then(Value::as_str))
3357                    .collect::<Vec<_>>()
3358                    .join(""),
3359            ),
3360            _ => None,
3361        }
3362    }
3363
3364    #[tokio::test]
3365    async fn query_timeline_handles_large_store_without_backfill_calls() {
3366        let store = Arc::new(CountingConsoleLogStore::new());
3367        let aggregator = MobKitConsoleAggregator::new(store.clone());
3368        for idx in 0..5_000 {
3369            store
3370                .append_if_absent(NewConsoleFrame {
3371                    id: None,
3372                    dedupe_key: format!("event-{idx}"),
3373                    timestamp_ms: idx,
3374                    runtime_key: "runtime-a".to_string(),
3375                    identity: "agent-a".to_string(),
3376                    conversation_id: Some("agent-a".to_string()),
3377                    session_id: Some("session-a".to_string()),
3378                    kind: "text_delta".to_string(),
3379                    status: ConsoleFrameStatus::Delivered,
3380                    payload: json!({ "delta": idx }),
3381                    source: ConsoleFrameSource {
3382                        kind: ConsoleFrameSourceKind::ConsoleEvent,
3383                        source_cursor: None,
3384                    },
3385                    source_event_id: Some(format!("event-{idx}")),
3386                    interaction_id: None,
3387                    turn_id: None,
3388                    run_id: None,
3389                    parent_frame_id: None,
3390                    caused_by_frame_id: None,
3391                })
3392                .await
3393                .expect("append frame");
3394        }
3395
3396        let page = aggregator
3397            .query_timeline(ConsoleTimelineQuery {
3398                identity: Some("agent-a".to_string()),
3399                limit: 1_000,
3400                ..ConsoleTimelineQuery::default()
3401            })
3402            .await
3403            .expect("large query");
3404
3405        assert_eq!(page.frames.len(), 1_000);
3406        assert_eq!(store.source_watermark_calls(), 0);
3407    }
3408
3409    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3410    async fn explicit_identity_timeline_query_does_not_resolve_roster_per_frame() {
3411        let store = Arc::new(CountingConsoleLogStore::new());
3412        let aggregator = MobKitConsoleAggregator::new(store.clone());
3413        let (_temp, runtime, _delayed_service) =
3414            build_stress_runtime(64, Duration::from_millis(0)).await;
3415        aggregator
3416            .inner
3417            .runtimes
3418            .write()
3419            .expect("runtime registry")
3420            .insert(
3421                "runtime-a".to_string(),
3422                runtime_entry_for_test("runtime-a", runtime.as_ref()),
3423            );
3424        for idx in 0..1_000 {
3425            store
3426                .append_if_absent(NewConsoleFrame {
3427                    id: None,
3428                    dedupe_key: format!("event-{idx}"),
3429                    timestamp_ms: idx,
3430                    runtime_key: "runtime-a".to_string(),
3431                    identity: "test/agent-0".to_string(),
3432                    conversation_id: Some("test/agent-0".to_string()),
3433                    session_id: Some("session-a".to_string()),
3434                    kind: "text_delta".to_string(),
3435                    status: ConsoleFrameStatus::Delivered,
3436                    payload: json!({ "delta": idx }),
3437                    source: ConsoleFrameSource {
3438                        kind: ConsoleFrameSourceKind::ConsoleEvent,
3439                        source_cursor: None,
3440                    },
3441                    source_event_id: Some(format!("event-{idx}")),
3442                    interaction_id: None,
3443                    turn_id: None,
3444                    run_id: None,
3445                    parent_frame_id: None,
3446                    caused_by_frame_id: None,
3447                })
3448                .await
3449                .expect("append frame");
3450        }
3451
3452        let page = tokio::time::timeout(
3453            Duration::from_secs(2),
3454            aggregator.query_timeline(ConsoleTimelineQuery {
3455                identity: Some("test/agent-0".to_string()),
3456                limit: 1_000,
3457                ..ConsoleTimelineQuery::default()
3458            }),
3459        )
3460        .await
3461        .expect("identity timeline query should not rediscover the roster per frame")
3462        .expect("timeline query succeeds");
3463
3464        assert_eq!(page.frames.len(), 1_000);
3465        assert_eq!(store.source_watermark_calls(), 0);
3466        let _ = runtime.mob_handle().stop().await;
3467    }
3468
3469    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3470    async fn refresh_session_history_parallelizes_slow_member_backfills_at_scale() {
3471        const MEMBER_COUNT: usize = 32;
3472        let (_temp, runtime, delayed_service) =
3473            build_stress_runtime(MEMBER_COUNT, Duration::from_millis(40)).await;
3474        let aggregator = MobKitConsoleAggregator::in_memory();
3475        aggregator
3476            .inner
3477            .runtimes
3478            .write()
3479            .expect("runtime registry")
3480            .insert(
3481                "runtime-stress".to_string(),
3482                runtime_entry_for_test("runtime-stress", &runtime),
3483            );
3484
3485        let started = Instant::now();
3486        aggregator
3487            .refresh_session_history()
3488            .await
3489            .expect("stress refresh");
3490        let elapsed = started.elapsed();
3491
3492        assert!(
3493            delayed_service.read_calls() >= MEMBER_COUNT,
3494            "expected at least one history read per member, saw {}",
3495            delayed_service.read_calls()
3496        );
3497        assert!(
3498            delayed_service.max_active_reads() > 1,
3499            "session history backfill should fan out instead of reading members serially"
3500        );
3501        assert!(
3502            delayed_service.max_active_reads()
3503                <= ConsoleAggregatorOptions::default().max_concurrent_session_backfills,
3504            "session history backfill should respect the default concurrency limit"
3505        );
3506        assert!(
3507            elapsed < Duration::from_millis(600),
3508            "parallel backfill should be far below serial {}ms path, elapsed: {elapsed:?}",
3509            MEMBER_COUNT * 40
3510        );
3511        let _ = runtime.mob_handle().stop().await;
3512    }
3513
3514    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3515    async fn session_history_backfill_respects_configured_concurrency_limit() {
3516        const MEMBER_COUNT: usize = 16;
3517        let (_temp, runtime, delayed_service) =
3518            build_stress_runtime(MEMBER_COUNT, Duration::from_millis(30)).await;
3519        let aggregator =
3520            MobKitConsoleAggregator::in_memory_with_options(ConsoleAggregatorOptions {
3521                max_concurrent_session_backfills: 4,
3522                ..ConsoleAggregatorOptions::default()
3523            });
3524        aggregator
3525            .inner
3526            .runtimes
3527            .write()
3528            .expect("runtime registry")
3529            .insert(
3530                "runtime-stress".to_string(),
3531                runtime_entry_for_test("runtime-stress", &runtime),
3532            );
3533
3534        aggregator
3535            .refresh_session_history()
3536            .await
3537            .expect("stress refresh");
3538
3539        assert!(
3540            delayed_service.max_active_reads() <= 4,
3541            "configured concurrency limit should bound session history reads, saw {}",
3542            delayed_service.max_active_reads()
3543        );
3544        let _ = runtime.mob_handle().stop().await;
3545    }
3546
3547    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3548    async fn live_event_burst_reaches_store_while_slow_backfill_is_running() {
3549        const MEMBER_COUNT: usize = 24;
3550        const LIVE_EVENT_COUNT: usize = 2_048;
3551        let (_temp, runtime, _delayed_service) =
3552            build_stress_runtime(MEMBER_COUNT, Duration::from_millis(200)).await;
3553        let aggregator = MobKitConsoleAggregator::in_memory();
3554        aggregator.register_runtime(ConsoleRuntimeRegistration {
3555            runtime_key: "runtime-burst".to_string(),
3556            runtime: runtime.clone(),
3557            identity_namespace: "stress".to_string(),
3558            visibility_policy: Arc::new(AllowAllConsoleVisibilityPolicy),
3559        });
3560        let console_events = runtime.console_events();
3561        for idx in 0..LIVE_EVENT_COUNT {
3562            console_events
3563                .append(
3564                    "agent-0",
3565                    Some("burst-turn".to_string()),
3566                    "text_delta",
3567                    json!({ "delta": format!("frame-{idx}") }),
3568                )
3569                .await;
3570        }
3571
3572        let deadline = Instant::now() + Duration::from_secs(5);
3573        let mut observed = 0;
3574        while Instant::now() < deadline {
3575            observed = count_console_event_frames(&aggregator, "stress/agent-0").await;
3576            if observed >= LIVE_EVENT_COUNT {
3577                break;
3578            }
3579            tokio::time::sleep(Duration::from_millis(25)).await;
3580        }
3581
3582        assert_eq!(
3583            observed, LIVE_EVENT_COUNT,
3584            "live pump should not drop frames while slow background backfill is running"
3585        );
3586        let _ = runtime.mob_handle().stop().await;
3587    }
3588
3589    async fn count_console_event_frames(
3590        aggregator: &MobKitConsoleAggregator,
3591        identity: &str,
3592    ) -> usize {
3593        let mut after = None;
3594        let mut count = 0;
3595        loop {
3596            let page = aggregator
3597                .query_timeline(ConsoleTimelineQuery {
3598                    identity: Some(identity.to_string()),
3599                    after,
3600                    limit: 1_000,
3601                    ..ConsoleTimelineQuery::default()
3602                })
3603                .await
3604                .expect("query burst timeline");
3605            if page.frames.is_empty() {
3606                break;
3607            }
3608            count += page
3609                .frames
3610                .iter()
3611                .filter(|frame| frame.source.kind == ConsoleFrameSourceKind::ConsoleEvent)
3612                .count();
3613            after = page.next_cursor;
3614            if after.is_none() {
3615                break;
3616            }
3617        }
3618        count
3619    }
3620
3621    #[tokio::test]
3622    async fn status_updates_get_replayable_aggregate_cursors() {
3623        let aggregator = MobKitConsoleAggregator::in_memory();
3624        let frame = NewConsoleFrame {
3625            id: None,
3626            dedupe_key: "send-1".to_string(),
3627            timestamp_ms: 1,
3628            runtime_key: "runtime-a".to_string(),
3629            identity: "agent-a".to_string(),
3630            conversation_id: Some("agent-a".to_string()),
3631            session_id: Some("session-1".to_string()),
3632            kind: "user_input".to_string(),
3633            status: ConsoleFrameStatus::Accepted,
3634            payload: json!({ "content": "hello" }),
3635            source: ConsoleFrameSource {
3636                kind: ConsoleFrameSourceKind::Send,
3637                source_cursor: None,
3638            },
3639            source_event_id: None,
3640            interaction_id: Some("interaction-1".to_string()),
3641            turn_id: None,
3642            run_id: None,
3643            parent_frame_id: None,
3644            caused_by_frame_id: None,
3645        };
3646        let inserted = aggregator
3647            .store()
3648            .append_if_absent(frame)
3649            .await
3650            .expect("append frame");
3651
3652        update_frame_status_and_emit(
3653            &aggregator.inner,
3654            &inserted.frame.id,
3655            ConsoleFrameStatus::Delivered,
3656        )
3657        .await
3658        .expect("update status");
3659
3660        let page = aggregator
3661            .query_timeline(ConsoleTimelineQuery {
3662                identity: Some("agent-a".to_string()),
3663                after: Some(inserted.frame.cursor.clone()),
3664                limit: 10,
3665                ..ConsoleTimelineQuery::default()
3666            })
3667            .await
3668            .expect("query timeline");
3669        assert_eq!(page.frames.len(), 1);
3670        assert_eq!(page.frames[0].kind, "frame_updated");
3671        assert_eq!(page.frames[0].parent_frame_id, Some(inserted.frame.id));
3672        assert_eq!(
3673            page.frames[0]
3674                .payload
3675                .get("frame")
3676                .and_then(|frame| frame.get("status"))
3677                .and_then(Value::as_str),
3678            Some("delivered")
3679        );
3680    }
3681
3682    #[tokio::test]
3683    async fn history_counterpart_scan_is_not_capped_to_one_page() {
3684        let aggregator = MobKitConsoleAggregator::in_memory();
3685        for idx in 0..1_005 {
3686            aggregator
3687                .store()
3688                .append_if_absent(NewConsoleFrame {
3689                    id: None,
3690                    dedupe_key: format!("filler-{idx}"),
3691                    timestamp_ms: idx,
3692                    runtime_key: "runtime-a".to_string(),
3693                    identity: "agent-a".to_string(),
3694                    conversation_id: Some("agent-a".to_string()),
3695                    session_id: Some("session-a".to_string()),
3696                    kind: "text_delta".to_string(),
3697                    status: ConsoleFrameStatus::Completed,
3698                    payload: json!({ "delta": idx }),
3699                    source: ConsoleFrameSource {
3700                        kind: ConsoleFrameSourceKind::ConsoleEvent,
3701                        source_cursor: None,
3702                    },
3703                    source_event_id: Some(format!("filler-{idx}")),
3704                    interaction_id: None,
3705                    turn_id: None,
3706                    run_id: None,
3707                    parent_frame_id: None,
3708                    caused_by_frame_id: None,
3709                })
3710                .await
3711                .expect("append filler");
3712        }
3713        aggregator
3714            .store()
3715            .append_if_absent(NewConsoleFrame {
3716                id: None,
3717                dedupe_key: "live-user-input".to_string(),
3718                timestamp_ms: 2_000,
3719                runtime_key: "runtime-a".to_string(),
3720                identity: "agent-a".to_string(),
3721                conversation_id: Some("agent-a".to_string()),
3722                session_id: Some("session-a".to_string()),
3723                kind: "user_input".to_string(),
3724                status: ConsoleFrameStatus::Delivered,
3725                payload: json!({ "content": "already here" }),
3726                source: ConsoleFrameSource {
3727                    kind: ConsoleFrameSourceKind::ConsoleEvent,
3728                    source_cursor: None,
3729                },
3730                source_event_id: Some("live-user-input".to_string()),
3731                interaction_id: None,
3732                turn_id: None,
3733                run_id: None,
3734                parent_frame_id: None,
3735                caused_by_frame_id: None,
3736            })
3737            .await
3738            .expect("append live input");
3739
3740        let history = NewConsoleFrame {
3741            id: None,
3742            dedupe_key: "history-user-input".to_string(),
3743            timestamp_ms: 3_000,
3744            runtime_key: "runtime-a".to_string(),
3745            identity: "agent-a".to_string(),
3746            conversation_id: Some("agent-a".to_string()),
3747            session_id: Some("session-a".to_string()),
3748            kind: "user_input".to_string(),
3749            status: ConsoleFrameStatus::Completed,
3750            payload: json!({ "content": "already here" }),
3751            source: ConsoleFrameSource {
3752                kind: ConsoleFrameSourceKind::SessionHistory,
3753                source_cursor: Some("session-a:1006".to_string()),
3754            },
3755            source_event_id: None,
3756            interaction_id: None,
3757            turn_id: None,
3758            run_id: None,
3759            parent_frame_id: None,
3760            caused_by_frame_id: None,
3761        };
3762
3763        assert!(
3764            history_frame_has_existing_counterpart(&aggregator.inner, &history)
3765                .await
3766                .expect("counterpart scan")
3767        );
3768    }
3769
3770    #[tokio::test]
3771    async fn history_counterpart_scan_matches_rpc_wrapped_user_prompts() {
3772        let aggregator = MobKitConsoleAggregator::in_memory();
3773        aggregator
3774            .store()
3775            .append_if_absent(NewConsoleFrame {
3776                id: None,
3777                dedupe_key: "live-user-input".to_string(),
3778                timestamp_ms: 2_000,
3779                runtime_key: "runtime-a".to_string(),
3780                identity: "agent-a".to_string(),
3781                conversation_id: Some("agent-a".to_string()),
3782                session_id: Some("session-a".to_string()),
3783                kind: "user_input".to_string(),
3784                status: ConsoleFrameStatus::Delivered,
3785                payload: json!({ "content": "hello from operator" }),
3786                source: ConsoleFrameSource {
3787                    kind: ConsoleFrameSourceKind::ConsoleEvent,
3788                    source_cursor: None,
3789                },
3790                source_event_id: Some("live-user-input".to_string()),
3791                interaction_id: None,
3792                turn_id: None,
3793                run_id: None,
3794                parent_frame_id: None,
3795                caused_by_frame_id: None,
3796            })
3797            .await
3798            .expect("append live input");
3799
3800        let history = NewConsoleFrame {
3801            id: None,
3802            dedupe_key: "history-user-input".to_string(),
3803            timestamp_ms: 3_000,
3804            runtime_key: "runtime-a".to_string(),
3805            identity: "agent-a".to_string(),
3806            conversation_id: Some("agent-a".to_string()),
3807            session_id: Some("session-a".to_string()),
3808            kind: "user_input".to_string(),
3809            status: ConsoleFrameStatus::Completed,
3810            payload: json!({ "content": "[EVENT via rpc] hello from operator" }),
3811            source: ConsoleFrameSource {
3812                kind: ConsoleFrameSourceKind::SessionHistory,
3813                source_cursor: Some("session-a:2".to_string()),
3814            },
3815            source_event_id: None,
3816            interaction_id: None,
3817            turn_id: None,
3818            run_id: None,
3819            parent_frame_id: None,
3820            caused_by_frame_id: None,
3821        };
3822
3823        assert!(
3824            history_frame_has_existing_counterpart(&aggregator.inner, &history)
3825                .await
3826                .expect("counterpart scan")
3827        );
3828    }
3829
3830    #[tokio::test]
3831    async fn history_counterpart_scan_matches_streamed_text_delta_completion() {
3832        let aggregator = MobKitConsoleAggregator::in_memory();
3833        for (idx, delta) in ["Ready ", "and standing by."].iter().enumerate() {
3834            aggregator
3835                .store()
3836                .append_if_absent(NewConsoleFrame {
3837                    id: None,
3838                    dedupe_key: format!("live-delta-{idx}"),
3839                    timestamp_ms: 2_000 + idx as u64,
3840                    runtime_key: "runtime-a".to_string(),
3841                    identity: "agent-a".to_string(),
3842                    conversation_id: Some("agent-a".to_string()),
3843                    session_id: Some("session-a".to_string()),
3844                    kind: "text_delta".to_string(),
3845                    status: ConsoleFrameStatus::Delivered,
3846                    payload: json!({ "delta": delta }),
3847                    source: ConsoleFrameSource {
3848                        kind: ConsoleFrameSourceKind::ConsoleEvent,
3849                        source_cursor: None,
3850                    },
3851                    source_event_id: Some(format!("live-delta-{idx}")),
3852                    interaction_id: Some("turn-a".to_string()),
3853                    turn_id: None,
3854                    run_id: None,
3855                    parent_frame_id: None,
3856                    caused_by_frame_id: None,
3857                })
3858                .await
3859                .expect("append live delta");
3860        }
3861
3862        let history = NewConsoleFrame {
3863            id: None,
3864            dedupe_key: "history-assistant-complete".to_string(),
3865            timestamp_ms: 3_000,
3866            runtime_key: "runtime-a".to_string(),
3867            identity: "agent-a".to_string(),
3868            conversation_id: Some("agent-a".to_string()),
3869            session_id: Some("session-a".to_string()),
3870            kind: "interaction_complete".to_string(),
3871            status: ConsoleFrameStatus::Completed,
3872            payload: json!({ "result": "Ready and standing by." }),
3873            source: ConsoleFrameSource {
3874                kind: ConsoleFrameSourceKind::SessionHistory,
3875                source_cursor: Some("session-a:3".to_string()),
3876            },
3877            source_event_id: None,
3878            interaction_id: None,
3879            turn_id: None,
3880            run_id: None,
3881            parent_frame_id: None,
3882            caused_by_frame_id: None,
3883        };
3884
3885        assert!(
3886            history_frame_has_existing_counterpart(&aggregator.inner, &history)
3887                .await
3888                .expect("counterpart scan")
3889        );
3890    }
3891
3892    #[test]
3893    fn session_history_watermark_key_is_session_scoped() {
3894        assert_ne!(
3895            session_history_watermark_runtime_key("runtime-a", "session-1"),
3896            session_history_watermark_runtime_key("runtime-a", "session-2")
3897        );
3898    }
3899
3900    #[test]
3901    fn session_history_watermarks_are_cursor_and_ttl_aware() {
3902        let legacy = "session:with:colon:42";
3903        let checked = format_session_history_watermark("session:with:colon", 43, 1_000);
3904        let empty_checked = format_session_history_watermark("session:with:colon", 0, 1_000);
3905
3906        assert_eq!(
3907            parse_session_history_watermark(legacy, "session:with:colon"),
3908            Some(42)
3909        );
3910        assert_eq!(
3911            parse_session_history_watermark(&checked, "session:with:colon"),
3912            Some(43)
3913        );
3914        assert!(session_history_watermark_is_fresh(
3915            &checked,
3916            "session:with:colon",
3917            1_500
3918        ));
3919        assert!(!session_history_watermark_is_fresh(
3920            &checked,
3921            "session:with:colon",
3922            1_000 + SESSION_HISTORY_GROWING_REFRESH_TTL_MS + 1
3923        ));
3924        assert!(session_history_watermark_is_fresh(
3925            &empty_checked,
3926            "session:with:colon",
3927            1_500
3928        ));
3929        assert!(!session_history_watermark_is_fresh(
3930            &empty_checked,
3931            "session:with:colon",
3932            1_000 + SESSION_HISTORY_REFRESH_TTL_MS + 1
3933        ));
3934    }
3935
3936    #[test]
3937    fn session_history_messages_project_to_renderable_frames() {
3938        let user = frame_from_session_history_message(
3939            "runtime-a",
3940            "agent-a",
3941            "session-a",
3942            0,
3943            json!({
3944                "role": "user",
3945                "content": "hello",
3946                "timestamp_ms": 10
3947            }),
3948        )
3949        .expect("user history frame");
3950        let assistant = frame_from_session_history_message(
3951            "runtime-a",
3952            "agent-a",
3953            "session-a",
3954            1,
3955            json!({
3956                "role": "assistant",
3957                "content": "hi there",
3958                "stop_reason": "end_turn",
3959                "usage": { "input_tokens": 1, "output_tokens": 1, "total_tokens": 2 },
3960                "timestamp_ms": 11
3961            }),
3962        )
3963        .expect("assistant history frame");
3964
3965        assert_eq!(user.kind, "user_input");
3966        assert_eq!(user.source.kind, ConsoleFrameSourceKind::SessionHistory);
3967        assert_eq!(
3968            user.payload["content"],
3969            json!([{ "type": "text", "text": "hello" }])
3970        );
3971        assert_eq!(assistant.kind, "interaction_complete");
3972        assert_eq!(assistant.payload["text"], json!("hi there"));
3973        assert!(
3974            assistant
3975                .dedupe_key
3976                .starts_with("session-history:runtime-a:session-a:1:")
3977        );
3978    }
3979
3980    #[test]
3981    fn session_history_projection_filters_scaffold_user_messages() {
3982        let spawn_notice = frame_from_session_history_message(
3983            "runtime-a",
3984            "agent-a",
3985            "session-a",
3986            0,
3987            json!({
3988                "role": "user",
3989                "content": "You have been spawned as 'agent-a' (role: worker) in mob 'mob-a'.",
3990                "timestamp_ms": 10
3991            }),
3992        );
3993        let peer_update = frame_from_session_history_message(
3994            "runtime-a",
3995            "agent-a",
3996            "session-a",
3997            1,
3998            json!({
3999                "role": "user",
4000                "content": [{ "type": "text", "text": "[PEER UPDATE] alpha wired to beta" }],
4001                "timestamp_ms": 11
4002            }),
4003        );
4004        let real_user = frame_from_session_history_message(
4005            "runtime-a",
4006            "agent-a",
4007            "session-a",
4008            2,
4009            json!({
4010                "role": "user",
4011                "content": "Please review the incident notes.",
4012                "timestamp_ms": 12
4013            }),
4014        );
4015
4016        assert!(spawn_notice.is_none());
4017        assert!(peer_update.is_none());
4018        assert!(real_user.is_some());
4019    }
4020
4021    #[test]
4022    fn session_history_projection_skips_non_transcript_messages() {
4023        let skipped = frame_from_session_history_message(
4024            "runtime-a",
4025            "agent-a",
4026            "session-a",
4027            0,
4028            json!({
4029                "content": "internal system prompt"
4030            }),
4031        );
4032        assert!(skipped.is_none());
4033    }
4034
4035    #[test]
4036    fn session_history_projection_extracts_assistant_blocks() {
4037        let frame = frame_from_session_history_message(
4038            "runtime-a",
4039            "agent-a",
4040            "session-a",
4041            0,
4042            json!({
4043                "role": "block_assistant",
4044                "blocks": [
4045                    { "block_type": "text", "data": { "text": "hello " } },
4046                    { "block_type": "text", "data": { "text": "there" } }
4047                ],
4048                "stop_reason": "end_turn"
4049            }),
4050        )
4051        .expect("assistant block history frame");
4052        assert_eq!(frame.payload["text"], json!("hello there"));
4053    }
4054
4055    #[test]
4056    fn session_history_projection_extracts_nested_text_block_data() {
4057        let frame = frame_from_session_history_message(
4058            "runtime-a",
4059            "agent-a",
4060            "session-a",
4061            0,
4062            json!({
4063                "role": "block_assistant",
4064                "blocks": [
4065                    {
4066                        "block_type": "text",
4067                        "data": { "text": "Ready and standing by." }
4068                    }
4069                ],
4070                "stop_reason": "end_turn",
4071                "created_at": "1970-01-01T00:00:00.010Z"
4072            }),
4073        )
4074        .expect("assistant block history frame");
4075
4076        assert_eq!(frame.kind, "interaction_complete");
4077        assert_eq!(frame.payload["result"], json!("Ready and standing by."));
4078    }
4079
4080    #[test]
4081    fn session_history_projection_drops_reasoning_blocks_from_result_text() {
4082        let frame = frame_from_session_history_message(
4083            "runtime-a",
4084            "agent-a",
4085            "session-a",
4086            0,
4087            json!({
4088                "role": "block_assistant",
4089                "blocks": [
4090                    {
4091                        "block_type": "reasoning",
4092                        "data": { "text": "**Planning**\nI should not be rendered." }
4093                    },
4094                    {
4095                        "block_type": "text",
4096                        "data": { "text": "Visible answer." }
4097                    }
4098                ],
4099                "stop_reason": "end_turn",
4100                "created_at": "1970-01-01T00:00:00.010Z"
4101            }),
4102        )
4103        .expect("assistant block history frame");
4104
4105        assert_eq!(frame.kind, "interaction_complete");
4106        assert_eq!(frame.payload["result"], json!("Visible answer."));
4107        assert_eq!(frame.payload["text"], json!("Visible answer."));
4108    }
4109
4110    #[test]
4111    fn session_history_projection_leaves_reasoning_only_result_empty() {
4112        let frame = frame_from_session_history_message(
4113            "runtime-a",
4114            "agent-a",
4115            "session-a",
4116            0,
4117            json!({
4118                "role": "block_assistant",
4119                "blocks": [
4120                    {
4121                        "block_type": "reasoning",
4122                        "data": { "text": "Private planning text." }
4123                    },
4124                    {
4125                        "block_type": "tool_use",
4126                        "data": { "id": "toolu-1", "name": "peers", "args": {} }
4127                    }
4128                ],
4129                "stop_reason": "end_turn",
4130                "created_at": "1970-01-01T00:00:00.010Z"
4131            }),
4132        )
4133        .expect("assistant block history frame");
4134
4135        assert_eq!(frame.kind, "interaction_complete");
4136        assert_eq!(frame.payload["result"], json!(""));
4137        assert_eq!(frame.payload["text"], json!(""));
4138    }
4139
4140    #[test]
4141    fn session_history_projection_preserves_tool_results() {
4142        let frames = frames_from_session_history_message(
4143            "runtime-a",
4144            "agent-a",
4145            "session-a",
4146            5,
4147            json!({
4148                "role": "tool_results",
4149                "results": [
4150                    {
4151                        "tool_use_id": "call-peers",
4152                        "content": "{\"peers\":[{\"peer_id\":\"peer-1\",\"name\":\"mob/worker/peer-1\"}]}",
4153                        "is_error": false
4154                    }
4155                ],
4156                "created_at": "1970-01-01T00:00:00.050Z"
4157            }),
4158        );
4159
4160        assert_eq!(frames.len(), 1);
4161        let frame = &frames[0];
4162        assert_eq!(frame.kind, "tool_execution_completed");
4163        assert_eq!(frame.payload["tool_call_id"], json!("call-peers"));
4164        assert_eq!(
4165            frame.payload["result"],
4166            json!("{\"peers\":[{\"peer_id\":\"peer-1\",\"name\":\"mob/worker/peer-1\"}]}")
4167        );
4168        assert_eq!(frame.source.kind, ConsoleFrameSourceKind::SessionHistory);
4169        assert_eq!(frame.timestamp_ms, 50);
4170    }
4171
4172    #[test]
4173    fn session_history_projection_uses_rfc3339_created_at_timestamp() {
4174        let frame = frame_from_session_history_message(
4175            "runtime-a",
4176            "agent-a",
4177            "session-a",
4178            0,
4179            json!({
4180                "role": "user",
4181                "content": "hello",
4182                "created_at": "2026-05-12T05:00:06.564227Z"
4183            }),
4184        )
4185        .expect("user history frame");
4186
4187        assert_eq!(frame.timestamp_ms, 1_778_562_006_564);
4188    }
4189}