ai-agent 0.13.4

Idiomatic agent sdk inspired by the claude code source leak
Documentation
//! REPL Bridge Core - Bootstrap-free core for Remote Control.
//!
//! Translated from openclaudecode/src/bridge/replBridge.ts
//!
//! This module provides the core bridge functionality: env registration -> session
//! creation -> poll loop -> ingress WS -> teardown. It reads nothing from
//! bootstrap/state or sessionStorage — all context comes from params.

use crate::bridge::poll_config_defaults::PollIntervalConfig;
use crate::bridge::repl_bridge_handle::{BridgeControlRequest, BridgeControlResponse, BridgeState};
use crate::bridge::repl_bridge_transport::ReplBridgeTransport;
use crate::bridge::SDKMessage;
use crate::error::AgentError;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;

// =============================================================================
// CONSTANTS
// =============================================================================

/// Poll error recovery constants.
const POLL_ERROR_INITIAL_DELAY_MS: u64 = 2_000;
const POLL_ERROR_MAX_DELAY_MS: u64 = 60_000;
const POLL_ERROR_GIVE_UP_MS: u64 = 15 * 60 * 1000;

// =============================================================================
// TYPES
// =============================================================================

/// Parameters for initializing the bridge core.
#[derive(Clone)]
pub struct BridgeCoreParams {
    /// Current working directory.
    pub dir: String,
    /// Machine name.
    pub machine_name: String,
    /// Current git branch.
    pub branch: String,
    /// Git repo URL.
    pub git_repo_url: Option<String>,
    /// Session title.
    pub title: String,
    /// Base API URL.
    pub base_url: String,
    /// Session ingress URL.
    pub session_ingress_url: String,
    /// Worker type (e.g., "repl", "daemon").
    pub worker_type: String,
    /// Get the current OAuth access token.
    pub get_access_token: Arc<dyn Fn() -> Option<String> + Send + Sync>,
    /// Create a new session.
    pub create_session: Arc<
        dyn Fn(
                String,
                String,
                Option<String>,
                String,
            ) -> future::BoxFuture<'static, Result<Option<String>, AgentError>>
            + Send
            + Sync,
    >,
    /// Archive a session.
    pub archive_session:
        Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<(), AgentError>> + Send + Sync>,
    /// Get current session title (for reconnection).
    pub get_current_title: Option<Arc<dyn Fn() -> String + Send + Sync>>,
    /// Convert internal messages to SDK format.
    pub to_sdk_messages:
        Option<Arc<dyn Fn(Vec<crate::types::Message>) -> Vec<SDKMessage> + Send + Sync>>,
    /// Handle OAuth 401 refresh.
    pub on_auth_401: Option<
        Arc<dyn Fn(String) -> future::BoxFuture<'static, Result<bool, AgentError>> + Send + Sync>,
    >,
    /// Get poll interval config.
    pub get_poll_interval_config: Option<Arc<dyn Fn() -> PollIntervalConfig + Send + Sync>>,
    /// Max initial messages to replay on connect.
    pub initial_history_cap: Option<u32>,
    /// Initial messages to flush on connect.
    pub initial_messages: Option<Vec<crate::types::Message>>,
    /// Previously flushed UUIDs (for dedup).
    pub previously_flushed_uuids:
        Option<Arc<dyn Fn() -> std::collections::HashSet<String> + Send + Sync>>,
    /// Callback for inbound messages.
    pub on_inbound_message: Option<Arc<dyn Fn(SDKMessage) + Send + Sync>>,
    /// Callback for permission responses.
    pub on_permission_response: Option<Arc<dyn Fn(BridgeControlResponse) + Send + Sync>>,
    /// Callback for interrupt.
    pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
    /// Callback for model change.
    pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
    /// Callback for max thinking tokens change.
    pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
    /// Callback for permission mode change.
    pub on_set_permission_mode:
        Option<Arc<dyn Fn(crate::permission::PermissionMode) -> Result<(), String> + Send + Sync>>,
    /// Callback for state changes.
    pub on_state_change: Option<Arc<dyn Fn(BridgeState, Option<String>) + Send + Sync>>,
    /// Callback for user messages (title derivation).
    pub on_user_message: Option<Arc<dyn Fn(String, String) -> bool + Send + Sync>>,
    /// Whether this is a perpetual (persistent) bridge.
    pub perpetual: Option<bool>,
    /// Initial SSE sequence number (for daemon persistence).
    pub initial_sse_sequence_num: Option<u64>,
}

/// Bridge handle returned after initialization.
pub struct BridgeCoreHandle {
    /// The bridge session ID.
    pub session_id: RwLock<String>,
    /// The environment ID (empty for env-less v2).
    pub environment_id: RwLock<String>,
    /// The session ingress URL.
    pub session_ingress_url: String,
    /// The transport (if connected).
    pub transport: RwLock<Option<Box<dyn ReplBridgeTransport>>>,
    /// Current work item ID.
    pub current_work_id: RwLock<Option<String>>,
    /// Current ingress token.
    pub current_ingress_token: RwLock<Option<String>>,
    /// Last SSE sequence number.
    pub last_sequence_num: RwLock<u64>,
    /// Poll controller signal.
    pub poll_abort: tokio::sync::watch::Sender<bool>,
    /// Teardown flag.
    pub teardown_started: RwLock<bool>,
    /// Parameters for callbacks.
    params: BridgeCoreParams,
}

impl BridgeCoreHandle {
    /// Create a new bridge handle.
    pub fn new(
        session_id: String,
        environment_id: String,
        session_ingress_url: String,
        params: BridgeCoreParams,
    ) -> Self {
        let (poll_abort, _) = tokio::sync::watch::channel(false);
        Self {
            session_id: RwLock::new(session_id),
            environment_id: RwLock::new(environment_id),
            session_ingress_url,
            transport: RwLock::new(None),
            current_work_id: RwLock::new(None),
            current_ingress_token: RwLock::new(None),
            last_sequence_num: RwLock::new(0),
            poll_abort,
            teardown_started: RwLock::new(false),
            params,
        }
    }

    /// Write messages to the bridge.
    pub async fn write_messages(&self, messages: Vec<SDKMessage>) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            t.write_batch(messages).await;
        }
    }

    /// Write SDK messages directly.
    pub async fn write_sdk_messages(&self, messages: Vec<SDKMessage>) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            t.write_batch(messages).await;
        }
    }

    /// Send a control request.
    pub async fn send_control_request(&self, request: BridgeControlRequest) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            let msg =
                bridge_message_from_control_request(request, self.session_id.read().await.clone());
            t.write(msg).await;
        }
    }

    /// Send a control response.
    pub async fn send_control_response(&self, response: BridgeControlResponse) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            let msg = bridge_message_from_control_response(
                response,
                self.session_id.read().await.clone(),
            );
            t.write(msg).await;
        }
    }

    /// Send a control cancel request.
    pub async fn send_control_cancel_request(&self, request_id: &str) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            let msg = bridge_control_cancel_request(
                request_id.to_string(),
                self.session_id.read().await.clone(),
            );
            t.write(msg).await;
        }
    }

    /// Send a result message.
    pub async fn send_result(&self) {
        let transport = self.transport.read().await;
        if let Some(t) = transport.as_ref() {
            let msg = bridge_result_message(self.session_id.read().await.clone());
            t.write(msg).await;
        }
    }

    /// Get the SSE sequence number for persistence.
    pub fn get_sse_sequence_num(&self) -> u64 {
        // Return the last captured sequence number
        // In practice, this would merge with the live transport's seq
        *self.last_sequence_num.blocking_read()
    }

    /// Tear down the bridge.
    pub async fn teardown(&self) {
        let mut started = self.teardown_started.write().await;
        if *started {
            return;
        }
        *started = true;
        drop(started);

        // Abort poll loop
        let _ = self.poll_abort.send(true);

        // Close transport
        let mut transport = self.transport.write().await;
        if let Some(t) = transport.take() {
            t.close();
        }

        // Call on_state_change if provided
        if let Some(ref callback) = self.params.on_state_change {
            callback(BridgeState::Failed, Some("teardown".to_string()));
        }
    }
}

/// Bridge state for debugging.
#[derive(Debug, Clone, PartialEq)]
pub enum BridgeStateInternal {
    Ready,
    Connecting,
    Connected,
    Reconnecting,
    Failed,
}

// =============================================================================
// INITIALIZATION
// =============================================================================

/// Initialize the bridge core.
///
/// Returns None on registration or session-creation failure.
pub async fn init_bridge_core(
    params: BridgeCoreParams,
) -> Result<Option<BridgeCoreHandle>, AgentError> {
    // Get poll interval config (default if not provided)
    let poll_config = params
        .get_poll_interval_config
        .as_ref()
        .map(|f| f())
        .unwrap_or_default();

    // Call on_state_change if provided
    if let Some(ref callback) = params.on_state_change {
        callback(BridgeState::Ready, None);
    }

    // Note: The actual implementation would:
    // 1. Register the bridge environment
    // 2. Create a session
    // 3. Start the poll loop for work items
    // 4. Connect transport when work arrives
    // 5. Handle reconnection logic
    //
    // This is a simplified version - the full implementation would be
    // several thousand lines of code handling:
    // - Environment registration and re-registration
    // - Session creation and recovery
    // - Work polling with heartbeat
    // - Transport management (v1/v2)
    // - Reconnection logic
    // - Graceful teardown

    Ok(None)
}

// =============================================================================
// HELPER FUNCTIONS
// =============================================================================

/// Compute exponential backoff delay.
pub fn compute_backoff(consecutive_errors: u32) -> Duration {
    let delay = POLL_ERROR_INITIAL_DELAY_MS
        * 2u64.saturating_pow(consecutive_errors.saturating_sub(1) as u32);
    Duration::from_millis(delay.min(POLL_ERROR_MAX_DELAY_MS))
}

/// Check if we should give up polling after errors.
pub fn should_give_up(first_error_time: Option<u64>, give_up_ms: u64) -> bool {
    if let Some(start) = first_error_time {
        let elapsed = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0)
            - start;
        return elapsed >= give_up_ms;
    }
    false
}

// =============================================================================
// MESSAGE HELPER FUNCTIONS
// =============================================================================

/// Create a bridge message from a control request.
pub fn bridge_message_from_control_request(
    _request: BridgeControlRequest,
    session_id: String,
) -> SDKMessage {
    // Simplified - actual implementation would serialize properly
    SDKMessage::user_message_with_session(session_id)
}

/// Create a bridge message from a control response.
pub fn bridge_message_from_control_response(
    _response: BridgeControlResponse,
    session_id: String,
) -> SDKMessage {
    SDKMessage::user_message_with_session(session_id)
}

/// Create a control cancel request message.
pub fn bridge_control_cancel_request(request_id: String, session_id: String) -> SDKMessage {
    // The actual implementation would create a proper message
    SDKMessage::user_message_with_session(session_id)
}

/// Create a result message.
pub fn bridge_result_message(session_id: String) -> SDKMessage {
    SDKMessage::user_message_with_session(session_id)
}

// =============================================================================
// FUTURE TYPE HELPERS
// =============================================================================

mod future {
    use crate::error::AgentError;
    use core::pin::Pin;

    pub type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
}