lash-core 0.1.0-alpha.39

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use super::*;

impl ManagedSessionCapability {
    pub(in crate::runtime::session_manager) async fn start_turn(
        &self,
        current: &CurrentSessionCapability,
        usage: &UsageCapability,
        request: crate::SessionTurnRequest<'_>,
    ) -> Result<AssembledTurn, crate::PluginError> {
        let (
            crate::SessionTurnInput {
                session_id,
                turn_id,
                input,
            },
            scoped_effect_controller,
        ) = request.into_parts();
        let runtime = {
            let registry = self.registry.lock().await;
            registry.get(&session_id).cloned()
        }
        .ok_or_else(|| crate::PluginError::Session(format!("unknown session `{session_id}`")))?;
        let policy = {
            let runtime = runtime.runtime.lock().await;
            runtime.session_policy()
        };
        let cancel = CancellationToken::new();
        let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(100);
        let usage_source = self.child_usage_source(usage, &session_id);
        let sink = ChannelEventSink {
            tx: event_tx,
            live_usage: Some(LiveChildUsageForwarder {
                turn_id: turn_id.to_string(),
                session_id: session_id.to_string(),
                source: usage_source,
                model: policy.model.id.clone(),
                token_ledger: Arc::clone(&usage.token_ledger),
                child_turn_live_usage: Arc::clone(&usage.child_turn_live_usage),
                relay: usage.child_usage_event_relay.clone(),
            }),
        };
        let event_drain = tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
        {
            let mut turns = self.turns.lock().await;
            if turns
                .values()
                .any(|turn| turn.session_id == session_id.as_str())
            {
                return Err(crate::PluginError::Session(format!(
                    "session `{session_id}` already has a running turn"
                )));
            }
            turns.insert(
                turn_id.to_string(),
                ManagedSessionTurn {
                    session_id: session_id.to_string(),
                },
            );
        }
        let turn = {
            let mut runtime_guard = runtime.runtime.lock().await;
            let result = async {
                runtime_guard
                    .refresh_session_tool_surface()
                    .await
                    .map_err(|err| crate::PluginError::Session(err.to_string()))?;
                let run = runtime_guard
                    .stream_turn_with_agent_frames(
                        input,
                        crate::runtime::TurnOptions::new(cancel, scoped_effect_controller)
                            .with_events(&sink),
                    )
                    .await
                    .map_err(|err| crate::PluginError::Session(err.to_string()))?;
                let turn = run.into_final_turn().ok_or_else(|| {
                    crate::PluginError::Session(
                        "agent frame run completed without a turn".to_string(),
                    )
                })?;
                Ok(turn)
            }
            .await;
            runtime.publish_from(&runtime_guard);
            result
        };
        self.turns.lock().await.remove(&turn_id);
        drop(sink);
        let _ = event_drain.await;
        let live_reported = self.turn_live_usage(usage, &turn_id);
        if let Ok(turn) = &turn {
            let source = self.child_usage_source(usage, &session_id);
            if let Some(remainder) = subtract_usage(&live_reported, &turn.token_usage) {
                usage.record_token_usage(&source, &turn.state.policy.model.id, &remainder);
            }
        }
        usage.persist_current_usage_ledger(current).await?;
        turn
    }

    fn child_usage_source(&self, usage: &UsageCapability, session_id: &str) -> String {
        usage
            .child_sources
            .lock()
            .expect("child usage sources lock")
            .get(session_id)
            .cloned()
            .unwrap_or_else(|| "child".to_string())
    }

    fn turn_live_usage(&self, usage: &UsageCapability, turn_id: &str) -> TokenUsage {
        usage
            .child_turn_live_usage
            .lock()
            .expect("child turn live usage lock")
            .remove(turn_id)
            .unwrap_or_default()
    }
}

#[cfg(test)]
mod tests {
    #[test]
    fn session_turn_request_requires_matching_scope_and_sets_trace_turn_id() {
        let controller = crate::InlineRuntimeEffectController;
        let scoped_effect_controller = crate::ScopedEffectController::borrowed(
            &controller,
            crate::EffectScope::turn("child", "child-turn"),
        )
        .expect("turn scope");
        let request = crate::SessionTurnRequest::new(
            "child",
            "child-turn",
            crate::TurnInput::text("run child"),
            scoped_effect_controller,
        )
        .expect("valid child turn request");

        assert_eq!(request.session_id(), "child");
        assert_eq!(request.turn_id(), "child-turn");
        assert_eq!(request.input().trace_turn_id.as_deref(), Some("child-turn"));
    }

    #[test]
    fn session_turn_request_rejects_mismatched_effect_scope() {
        let controller = crate::InlineRuntimeEffectController;
        let scoped_effect_controller = crate::ScopedEffectController::borrowed(
            &controller,
            crate::EffectScope::turn("child", "other-turn"),
        )
        .expect("turn scope");
        let err = match crate::SessionTurnRequest::new(
            "child",
            "child-turn",
            crate::TurnInput::text("run child"),
            scoped_effect_controller,
        ) {
            Ok(_) => panic!("mismatched turn scope should fail"),
            Err(err) => err,
        };

        assert!(err.to_string().contains("same id"));
    }
}