opencrabs 0.3.30

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
use super::builder::AgentService;
use super::types::*;
use crate::brain::agent::error::{AgentError, Result};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

impl AgentService {
    /// Send a message and get a response
    ///
    /// This will:
    /// 1. Load conversation context from the database
    /// 2. Add the new user message
    /// 3. Send to the LLM provider
    /// 4. Save the response to the database
    /// 5. Update token usage
    pub async fn send_message(
        &self,
        session_id: Uuid,
        user_message: String,
        model: Option<String>,
    ) -> Result<AgentResponse> {
        // Prepare message context (common setup logic)
        let (_model_name, request, message_service, session_service) = self
            .prepare_message_context(session_id, user_message, model)
            .await?;

        // Send to provider — use session's provider so a concurrent
        // foreground swap on another pane can't hijack this turn.
        let provider = self.provider_for_session(session_id);
        let response = provider
            .complete(request)
            .await
            .map_err(AgentError::Provider)?;

        // Extract text from response
        let assistant_text = Self::extract_text_from_response(&response);

        // Save assistant response to database
        let assistant_db_msg = message_service
            .create_message(session_id, "assistant".to_string(), assistant_text.clone())
            .await
            .map_err(|e| AgentError::Database(e.to_string()))?;

        // Calculate total tokens and cost for this message
        let billable_input = response.usage.input_tokens
            + response.usage.cache_creation_tokens
            + response.usage.cache_read_tokens;
        let total_tokens = billable_input + response.usage.output_tokens;
        let cost = self
            .provider_for_session(session_id)
            .calculate_cost_with_cache(
                &response.model,
                response.usage.input_tokens,
                response.usage.output_tokens,
                response.usage.cache_creation_tokens,
                response.usage.cache_read_tokens,
            );

        // Update message with usage info, stashing the server-reported
        // prompt token count so session reload reads it directly.
        message_service
            .update_message_usage(
                assistant_db_msg.id,
                total_tokens as i32,
                cost,
                Some(billable_input as i32),
            )
            .await
            .map_err(|e| AgentError::Database(e.to_string()))?;

        // Update session token usage
        session_service
            .update_session_usage(session_id, total_tokens as i32, cost)
            .await
            .map_err(|e| AgentError::Database(e.to_string()))?;

        Ok(AgentResponse {
            message_id: assistant_db_msg.id,
            content: assistant_text,
            stop_reason: response.stop_reason,
            context_tokens: response.usage.input_tokens,
            usage: response.usage,
            cost,
            model: response.model,
            provider_name: self.provider_name_for_session(session_id),
        })
    }

    /// Send a message and get a streaming response
    ///
    /// Returns a stream of response chunks that can be consumed incrementally.
    pub async fn send_message_streaming(
        &self,
        session_id: Uuid,
        user_message: String,
        model: Option<String>,
    ) -> Result<AgentStreamResponse> {
        // Prepare message context (common setup logic)
        let (model_name, request, _message_service, _session_service) = self
            .prepare_message_context(session_id, user_message, model)
            .await?;

        // Add streaming flag to request
        let request = request.with_streaming();

        // Get streaming response from provider (session-scoped so a
        // concurrent /models pick on a different pane can't hijack).
        let provider = self.provider_for_session(session_id);
        let stream = provider
            .stream(request)
            .await
            .map_err(AgentError::Provider)?;

        Ok(AgentStreamResponse {
            session_id,
            message_id: Uuid::new_v4(),
            stream,
            model: model_name,
        })
    }

    /// Send a message with automatic tool execution (TUI channel).
    pub async fn send_message_with_tools(
        &self,
        session_id: Uuid,
        user_message: String,
        model: Option<String>,
    ) -> Result<AgentResponse> {
        self.send_message_with_tools_and_mode(session_id, user_message, model, None)
            .await
    }

    /// Shim: send with tools + optional cancellation token (TUI channel).
    /// Delegates to `run_tool_loop` with service-level callbacks.
    pub async fn send_message_with_tools_and_mode(
        &self,
        session_id: Uuid,
        user_message: String,
        model: Option<String>,
        cancel_token: Option<CancellationToken>,
    ) -> Result<AgentResponse> {
        self.run_tool_loop(
            session_id,
            user_message,
            None,
            model,
            cancel_token,
            None,
            None,
            None,
            "tui",
            None,
        )
        .await
    }

    /// Send a message with per-call callback overrides and channel routing.
    ///
    /// `override_*_callback` parameters take precedence over the service-level
    /// callbacks (used by channels). Pass `None` to fall back to the
    /// service-level callback. `override_question_callback` is the
    /// per-call surface for the `follow_up_question` tool — channels
    /// with native button surfaces construct one per message; non-
    /// interactive callers (CLI, RSI, A2A) pass None.
    ///
    /// `channel` and `channel_chat_id` identify the originating channel for
    /// crash recovery routing.
    #[allow(clippy::too_many_arguments)]
    pub async fn send_message_with_tools_and_callback(
        &self,
        session_id: Uuid,
        user_message: String,
        model: Option<String>,
        cancel_token: Option<CancellationToken>,
        override_approval_callback: Option<ApprovalCallback>,
        override_progress_callback: Option<ProgressCallback>,
        override_question_callback: Option<QuestionCallback>,
        channel: &str,
        channel_chat_id: Option<&str>,
    ) -> Result<AgentResponse> {
        self.run_tool_loop(
            session_id,
            user_message,
            None,
            model,
            cancel_token,
            override_approval_callback,
            override_progress_callback,
            override_question_callback,
            channel,
            channel_chat_id,
        )
        .await
    }

    /// Send a message and provide a separate human-readable `display_text`
    /// for DB persistence and TUI/session display. The full `user_message`
    /// (typically the channel-wrapped agent input with sender metadata,
    /// reply context, group history, channel hints) still goes to the LLM
    /// context so the agent retains all the information it needs, but the
    /// chat history shown in the TUI mirrors what the user actually typed
    /// in the channel.
    ///
    /// `override_question_callback` is the per-call surface for the
    /// `follow_up_question` tool — same semantics as the callback-only
    /// shim above.
    #[allow(clippy::too_many_arguments)]
    pub async fn send_message_with_tools_and_display(
        &self,
        session_id: Uuid,
        user_message: String,
        display_text: Option<String>,
        model: Option<String>,
        cancel_token: Option<CancellationToken>,
        override_approval_callback: Option<ApprovalCallback>,
        override_progress_callback: Option<ProgressCallback>,
        override_question_callback: Option<QuestionCallback>,
        channel: &str,
        channel_chat_id: Option<&str>,
    ) -> Result<AgentResponse> {
        self.run_tool_loop(
            session_id,
            user_message,
            display_text,
            model,
            cancel_token,
            override_approval_callback,
            override_progress_callback,
            override_question_callback,
            channel,
            channel_chat_id,
        )
        .await
    }
}