Skip to main content

openclaw_gateway/
server.rs

1//! Gateway server.
2
3use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use axum::{
9    Json, Router,
10    extract::State,
11    extract::ws::{Message, WebSocket, WebSocketUpgrade},
12    response::IntoResponse,
13    routing::{get, post},
14};
15use chrono::Utc;
16use futures::{SinkExt, StreamExt};
17use tokio::sync::RwLock;
18
19use openclaw_agents::runtime::{AgentContext, AgentRuntime};
20use openclaw_agents::tools::ToolRegistry;
21use openclaw_channels::{ChannelCapabilities, ChannelRegistry};
22use openclaw_core::events::{
23    EventStore, SessionEvent, SessionEventKind, SessionMessage, SessionProjection, SessionState,
24};
25use openclaw_core::types::{AgentId, ChannelId, SessionKey};
26
27use crate::GatewayError;
28use crate::auth::{AuthConfig, AuthState, JwtManager, User, UserRole, setup::auto_setup_from_env};
29use crate::events::EventBroadcaster;
30use crate::rpc::{self, RpcRequest, RpcResponse};
31
32#[cfg(feature = "ui")]
33use crate::ui_server::UiServerConfig;
34
35/// Gateway configuration.
36#[derive(Debug, Clone)]
37pub struct GatewayConfig {
38    /// Port to listen on.
39    pub port: u16,
40    /// Bind address.
41    pub bind_address: String,
42    /// Enable CORS.
43    pub cors: bool,
44    /// Data directory for persistent storage.
45    pub data_dir: PathBuf,
46    /// Authentication configuration.
47    pub auth: AuthConfig,
48    /// UI server configuration (optional, requires "ui" feature).
49    #[cfg(feature = "ui")]
50    pub ui: Option<UiServerConfig>,
51}
52
53impl Default for GatewayConfig {
54    fn default() -> Self {
55        let data_dir = dirs::data_dir()
56            .unwrap_or_else(|| PathBuf::from("."))
57            .join("openclaw")
58            .join("gateway");
59
60        Self {
61            port: 18789,
62            bind_address: "127.0.0.1".to_string(),
63            cors: true,
64            data_dir,
65            auth: AuthConfig::default(),
66            #[cfg(feature = "ui")]
67            ui: Some(UiServerConfig::default()),
68        }
69    }
70}
71
72/// Gateway server state shared across handlers.
73pub struct GatewayState {
74    /// Event store for session persistence.
75    pub event_store: Arc<EventStore>,
76    /// Agent runtimes by agent ID.
77    pub agents: HashMap<String, Arc<AgentRuntime>>,
78    /// Shared tool registry.
79    pub tool_registry: Arc<ToolRegistry>,
80    /// Authentication state.
81    pub auth: Arc<AuthState>,
82    /// Channel registry.
83    pub channels: Arc<RwLock<ChannelRegistry>>,
84    /// UI event broadcaster.
85    pub events: EventBroadcaster,
86    /// Gateway configuration.
87    pub config: GatewayConfig,
88}
89
90/// Gateway server.
91pub struct Gateway {
92    config: GatewayConfig,
93    state: Arc<RwLock<GatewayState>>,
94}
95
96/// Builder for constructing a Gateway with its dependencies.
97pub struct GatewayBuilder {
98    config: GatewayConfig,
99    event_store: Option<Arc<EventStore>>,
100    agents: HashMap<String, Arc<AgentRuntime>>,
101    tool_registry: Arc<ToolRegistry>,
102    auth_state: Option<Arc<AuthState>>,
103    channel_registry: Option<Arc<RwLock<ChannelRegistry>>>,
104    event_broadcaster: Option<EventBroadcaster>,
105}
106
107impl GatewayBuilder {
108    /// Create a new builder with default config.
109    #[must_use]
110    pub fn new() -> Self {
111        Self {
112            config: GatewayConfig::default(),
113            event_store: None,
114            agents: HashMap::new(),
115            tool_registry: Arc::new(ToolRegistry::new()),
116            auth_state: None,
117            channel_registry: None,
118            event_broadcaster: None,
119        }
120    }
121
122    /// Set gateway configuration.
123    #[must_use]
124    pub fn with_config(mut self, config: GatewayConfig) -> Self {
125        self.config = config;
126        self
127    }
128
129    /// Set the event store.
130    #[must_use]
131    pub fn with_event_store(mut self, store: Arc<EventStore>) -> Self {
132        self.event_store = Some(store);
133        self
134    }
135
136    /// Register an agent runtime.
137    #[must_use]
138    pub fn with_agent(mut self, id: impl Into<String>, runtime: Arc<AgentRuntime>) -> Self {
139        self.agents.insert(id.into(), runtime);
140        self
141    }
142
143    /// Set the tool registry.
144    #[must_use]
145    pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
146        self.tool_registry = registry;
147        self
148    }
149
150    /// Set the auth state.
151    #[must_use]
152    pub fn with_auth_state(mut self, auth: Arc<AuthState>) -> Self {
153        self.auth_state = Some(auth);
154        self
155    }
156
157    /// Set the channel registry.
158    #[must_use]
159    pub fn with_channel_registry(mut self, registry: Arc<RwLock<ChannelRegistry>>) -> Self {
160        self.channel_registry = Some(registry);
161        self
162    }
163
164    /// Set the event broadcaster.
165    #[must_use]
166    pub fn with_event_broadcaster(mut self, broadcaster: EventBroadcaster) -> Self {
167        self.event_broadcaster = Some(broadcaster);
168        self
169    }
170
171    /// Build the gateway.
172    ///
173    /// # Errors
174    ///
175    /// Returns error if event store is not configured or auth initialization fails.
176    pub fn build(self) -> Result<Gateway, GatewayError> {
177        let event_store = self
178            .event_store
179            .ok_or_else(|| GatewayError::Config("Event store is required".to_string()))?;
180
181        // Ensure data directory exists
182        std::fs::create_dir_all(&self.config.data_dir)
183            .map_err(|e| GatewayError::Config(format!("Failed to create data dir: {e}")))?;
184
185        // Initialize auth state if not provided
186        let auth = if let Some(auth) = self.auth_state {
187            auth
188        } else {
189            let auth_config = self.config.auth.clone().with_env_overrides();
190            Arc::new(
191                AuthState::initialize(auth_config, &self.config.data_dir)
192                    .map_err(|e| GatewayError::Config(format!("Auth init failed: {e}")))?,
193            )
194        };
195
196        // Auto-setup from environment if configured
197        if let Err(e) = auto_setup_from_env(&auth.users) {
198            tracing::warn!("Auto-setup from env failed: {}", e);
199        }
200
201        // Initialize channel registry
202        let channels = self
203            .channel_registry
204            .unwrap_or_else(|| Arc::new(RwLock::new(ChannelRegistry::new())));
205
206        // Initialize event broadcaster
207        let events = self.event_broadcaster.unwrap_or_default();
208
209        let state = GatewayState {
210            event_store,
211            agents: self.agents,
212            tool_registry: self.tool_registry,
213            auth,
214            channels,
215            events,
216            config: self.config.clone(),
217        };
218
219        Ok(Gateway {
220            config: self.config,
221            state: Arc::new(RwLock::new(state)),
222        })
223    }
224}
225
226impl Default for GatewayBuilder {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232impl Gateway {
233    /// Create a new gateway (for backward compatibility).
234    pub fn new(config: GatewayConfig) -> Result<Self, GatewayError> {
235        // Ensure data directory exists
236        std::fs::create_dir_all(&config.data_dir)
237            .map_err(|e| GatewayError::Config(format!("Failed to create data dir: {e}")))?;
238
239        // Create event store in data directory
240        let event_store = Arc::new(
241            EventStore::open(&config.data_dir.join("events"))
242                .map_err(|e| GatewayError::Server(format!("Failed to open event store: {e}")))?,
243        );
244
245        // Initialize auth
246        let auth_config = config.auth.clone().with_env_overrides();
247        let auth = Arc::new(
248            AuthState::initialize(auth_config, &config.data_dir)
249                .map_err(|e| GatewayError::Config(format!("Auth init failed: {e}")))?,
250        );
251
252        // Auto-setup from environment if configured
253        if let Err(e) = auto_setup_from_env(&auth.users) {
254            tracing::warn!("Auto-setup from env failed: {}", e);
255        }
256
257        let state = GatewayState {
258            event_store,
259            agents: HashMap::new(),
260            tool_registry: Arc::new(ToolRegistry::new()),
261            auth,
262            channels: Arc::new(RwLock::new(ChannelRegistry::new())),
263            events: EventBroadcaster::new(),
264            config: config.clone(),
265        };
266
267        Ok(Self {
268            config,
269            state: Arc::new(RwLock::new(state)),
270        })
271    }
272
273    /// Run the gateway server.
274    ///
275    /// Starts the API server and optionally the UI server (if the "ui" feature is enabled
276    /// and UI configuration is present).
277    pub async fn run(&self) -> Result<(), GatewayError> {
278        let state = self.state.clone();
279
280        // Check for bootstrap requirement
281        {
282            let state_read = state.read().await;
283            let mut bootstrap = state_read.auth.bootstrap.write().await;
284            if let Some(_token) = bootstrap.check_and_generate(&state_read.auth.users) {
285                let base_url = format!("http://{}:{}", self.config.bind_address, self.config.port);
286                bootstrap.print_bootstrap_info(&base_url);
287            }
288        }
289
290        // Build API router
291        let app = Router::new()
292            .route("/health", get(health_handler))
293            .route("/rpc", post(rpc_handler))
294            .route("/ws", get(ws_handler))
295            .with_state(state);
296
297        let addr: SocketAddr = format!("{}:{}", self.config.bind_address, self.config.port)
298            .parse()
299            .map_err(|e| GatewayError::Config(format!("Invalid address: {e}")))?;
300
301        tracing::info!("Gateway API listening on http://{}", addr);
302
303        // Start API server
304        let api_listener = tokio::net::TcpListener::bind(addr).await?;
305        let api_handle = tokio::spawn(async move { axum::serve(api_listener, app).await });
306
307        // Optionally start UI server
308        #[cfg(feature = "ui")]
309        let ui_handle = if let Some(ref ui_config) = self.config.ui {
310            if ui_config.enabled {
311                let config = ui_config.clone();
312                Some(tokio::spawn(async move {
313                    crate::ui_server::run_ui_server(config).await
314                }))
315            } else {
316                None
317            }
318        } else {
319            None
320        };
321
322        // Wait for servers to complete (or error)
323        #[cfg(feature = "ui")]
324        {
325            tokio::select! {
326                result = api_handle => {
327                    result
328                        .map_err(|e| GatewayError::Server(format!("API server panic: {e}")))?
329                        .map_err(|e| GatewayError::Server(e.to_string()))?;
330                }
331                result = async {
332                    match ui_handle {
333                        Some(handle) => handle.await,
334                        None => std::future::pending().await,
335                    }
336                } => {
337                    result
338                        .map_err(|e| GatewayError::Server(format!("UI server panic: {e}")))?
339                        .map_err(|e| GatewayError::Server(e.to_string()))?;
340                }
341            }
342        }
343
344        #[cfg(not(feature = "ui"))]
345        {
346            api_handle
347                .await
348                .map_err(|e| GatewayError::Server(format!("API server panic: {e}")))?
349                .map_err(|e| GatewayError::Server(e.to_string()))?;
350        }
351
352        Ok(())
353    }
354}
355
356async fn health_handler() -> &'static str {
357    "OK"
358}
359
360async fn rpc_handler(
361    State(state): State<Arc<RwLock<GatewayState>>>,
362    headers: axum::http::HeaderMap,
363    Json(request): Json<RpcRequest>,
364) -> Json<RpcResponse> {
365    let id = request.id.clone();
366
367    // Extract auth token from header
368    let auth_token = headers
369        .get(axum::http::header::AUTHORIZATION)
370        .and_then(|v| v.to_str().ok())
371        .and_then(JwtManager::extract_from_header);
372
373    let result = dispatch_rpc(&state, &request.method, &request.params, auth_token).await;
374
375    Json(match result {
376        Ok(value) => RpcResponse::success(id, value),
377        Err((code, message)) => RpcResponse::error(id, code, message),
378    })
379}
380
381/// WebSocket query parameters.
382#[derive(Debug, serde::Deserialize)]
383struct WsParams {
384    /// Auth token for WebSocket connection.
385    token: Option<String>,
386}
387
388async fn ws_handler(
389    State(state): State<Arc<RwLock<GatewayState>>>,
390    axum::extract::Query(params): axum::extract::Query<WsParams>,
391    ws: WebSocketUpgrade,
392) -> impl IntoResponse {
393    ws.on_upgrade(move |socket| handle_socket(socket, state, params.token))
394}
395
396async fn handle_socket(
397    socket: WebSocket,
398    state: Arc<RwLock<GatewayState>>,
399    auth_token: Option<String>,
400) {
401    let (sender, mut receiver) = socket.split();
402    let sender = Arc::new(tokio::sync::Mutex::new(sender));
403
404    // Validate token if auth is required for WebSocket
405    {
406        let state_read = state.read().await;
407        if state_read.auth.config.enabled && state_read.auth.config.require_auth_for_ws {
408            if let Some(token) = &auth_token {
409                if let Err(e) = state_read.auth.validate_token(token) {
410                    let error_response =
411                        RpcResponse::error(None, rpc::UNAUTHORIZED, format!("Invalid token: {e}"));
412                    let response_text = serde_json::to_string(&error_response).unwrap_or_default();
413                    let mut guard = sender.lock().await;
414                    let _ = guard.send(Message::Text(response_text.into())).await;
415                    return;
416                }
417            } else {
418                let error_response = RpcResponse::error(
419                    None,
420                    rpc::UNAUTHORIZED,
421                    "Authentication required".to_string(),
422                );
423                let response_text = serde_json::to_string(&error_response).unwrap_or_default();
424                let mut guard = sender.lock().await;
425                let _ = guard.send(Message::Text(response_text.into())).await;
426                return;
427            }
428        }
429    }
430
431    // Create a channel to stop the event listener
432    let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
433    let subscribed = Arc::new(std::sync::atomic::AtomicBool::new(false));
434    let subscribed_clone = subscribed.clone();
435    let sender_clone = sender.clone();
436    let state_clone = state.clone();
437
438    // Spawn event listener task
439    let event_task = tokio::spawn(async move {
440        // Wait until subscribed
441        loop {
442            if subscribed_clone.load(std::sync::atomic::Ordering::Relaxed) {
443                break;
444            }
445            if stop_rx.try_recv().is_ok() {
446                return;
447            }
448            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
449        }
450
451        // Subscribe to events
452        let mut event_rx = {
453            let state_read = state_clone.read().await;
454            state_read.events.subscribe()
455        };
456
457        loop {
458            tokio::select! {
459                _ = &mut stop_rx => {
460                    break;
461                }
462                event_result = event_rx.recv() => {
463                    match event_result {
464                        Ok(envelope) => {
465                            let event_msg = serde_json::json!({
466                                "jsonrpc": "2.0",
467                                "method": "event",
468                                "params": envelope,
469                            });
470                            let msg_text = serde_json::to_string(&event_msg).unwrap_or_default();
471                            let mut guard = sender_clone.lock().await;
472                            if guard.send(Message::Text(msg_text.into())).await.is_err() {
473                                break;
474                            }
475                        }
476                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
477                            tracing::warn!("Event listener lagged, missed {} events", n);
478                        }
479                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
480                            break;
481                        }
482                    }
483                }
484            }
485        }
486    });
487
488    // Handle incoming RPC requests
489    while let Some(msg) = receiver.next().await {
490        let msg = match msg {
491            Ok(Message::Text(text)) => text,
492            Ok(Message::Close(_)) => break,
493            Ok(_) => continue, // Ignore binary, ping, pong
494            Err(e) => {
495                tracing::warn!("WebSocket receive error: {}", e);
496                break;
497            }
498        };
499
500        // Parse JSON-RPC request
501        let request: RpcRequest = match serde_json::from_str(&msg) {
502            Ok(req) => req,
503            Err(e) => {
504                let error_response =
505                    RpcResponse::error(None, rpc::PARSE_ERROR, format!("Parse error: {e}"));
506                let response_text = serde_json::to_string(&error_response).unwrap_or_default();
507                let mut guard = sender.lock().await;
508                if guard
509                    .send(Message::Text(response_text.into()))
510                    .await
511                    .is_err()
512                {
513                    break;
514                }
515                continue;
516            }
517        };
518
519        // Check for events.subscribe to enable event streaming
520        if request.method == "events.subscribe" {
521            subscribed.store(true, std::sync::atomic::Ordering::Relaxed);
522        }
523
524        let id = request.id.clone();
525        let token_ref = auth_token.as_deref();
526        let result = dispatch_rpc(&state, &request.method, &request.params, token_ref).await;
527
528        let response = match result {
529            Ok(value) => RpcResponse::success(id, value),
530            Err((code, message)) => RpcResponse::error(id, code, message),
531        };
532
533        let response_text = serde_json::to_string(&response).unwrap_or_default();
534        let mut guard = sender.lock().await;
535        if guard
536            .send(Message::Text(response_text.into()))
537            .await
538            .is_err()
539        {
540            break;
541        }
542    }
543
544    // Stop event task
545    let _ = stop_tx.send(());
546    let _ = event_task.await;
547
548    tracing::debug!("WebSocket connection closed");
549}
550
551/// Dispatch RPC request to appropriate handler.
552async fn dispatch_rpc(
553    state: &Arc<RwLock<GatewayState>>,
554    method: &str,
555    params: &serde_json::Value,
556    auth_token: Option<&str>,
557) -> RpcResult {
558    let state_read = state.read().await;
559
560    // Check if method requires auth
561    if state_read.auth.requires_auth(method) {
562        let token =
563            auth_token.ok_or_else(|| (rpc::UNAUTHORIZED, "Authentication required".to_string()))?;
564
565        state_read
566            .auth
567            .validate_token(token)
568            .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
569    }
570
571    drop(state_read);
572
573    match method {
574        // Auth methods
575        "auth.login" => handle_auth_login(state, params).await,
576        "auth.logout" => handle_auth_logout(state, params).await,
577        "auth.refresh" => handle_auth_refresh(state, params).await,
578        "auth.me" => handle_auth_me(state, auth_token).await,
579
580        // Setup methods
581        "setup.status" => handle_setup_status(state).await,
582        "setup.init" => handle_setup_init(state, params).await,
583
584        // User management (admin only)
585        "users.list" => handle_users_list(state, auth_token).await,
586        "users.create" => handle_users_create(state, params, auth_token).await,
587        "users.update" => handle_users_update(state, params, auth_token).await,
588        "users.delete" => handle_users_delete(state, params, auth_token).await,
589
590        // Session methods
591        "session.create" => handle_session_create(state, params).await,
592        "session.message" => handle_session_message(state, params).await,
593        "session.history" => handle_session_history(state, params).await,
594        "session.end" => handle_session_end(state, params).await,
595        "session.list" => handle_session_list(state, params).await,
596        "session.search" => handle_session_search(state, params).await,
597        "session.stats" => handle_session_stats(state).await,
598        "session.events" => handle_session_events(state, params).await,
599
600        // Channel methods
601        "channels.list" => handle_channels_list(state).await,
602        "channels.status" => handle_channels_status(state).await,
603        "channels.probe" => handle_channels_probe(state, params).await,
604
605        // Agent methods
606        "agent.list" => handle_agent_list(state).await,
607        "agent.status" => handle_agent_status(state, params).await,
608        "agent.get" => handle_agent_get(state, params).await,
609
610        // Tool methods
611        "tools.list" => handle_tools_list(state).await,
612        "tools.execute" => handle_tools_execute(state, params).await,
613
614        // System methods
615        "system.health" => handle_system_health(state).await,
616        "system.version" => handle_system_version().await,
617
618        // Event subscription (WebSocket-only, returns ack)
619        "events.subscribe" => handle_events_subscribe().await,
620
621        _ => Err((rpc::METHOD_NOT_FOUND, format!("Method not found: {method}"))),
622    }
623}
624
625type RpcResult = Result<serde_json::Value, (i32, String)>;
626
627// ============================================================================
628// Auth RPC Handlers
629// ============================================================================
630
631async fn handle_auth_login(
632    state: &Arc<RwLock<GatewayState>>,
633    params: &serde_json::Value,
634) -> RpcResult {
635    let username = params["username"]
636        .as_str()
637        .ok_or((rpc::INVALID_PARAMS, "Missing username".to_string()))?;
638    let password = params["password"]
639        .as_str()
640        .ok_or((rpc::INVALID_PARAMS, "Missing password".to_string()))?;
641
642    let state = state.read().await;
643
644    // Find user
645    let user = state
646        .auth
647        .users
648        .get_by_username(username)
649        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
650        .ok_or((rpc::UNAUTHORIZED, "Invalid credentials".to_string()))?;
651
652    // Check if active
653    if !user.active {
654        return Err((rpc::UNAUTHORIZED, "Account disabled".to_string()));
655    }
656
657    // Verify password
658    user.verify_password(password)
659        .map_err(|_| (rpc::UNAUTHORIZED, "Invalid credentials".to_string()))?;
660
661    // Update last login
662    state.auth.users.update_last_login(&user.id).map_err(|e| {
663        (
664            rpc::INTERNAL_ERROR,
665            format!("Failed to update login time: {e}"),
666        )
667    })?;
668
669    // Generate tokens
670    let token_pair = state
671        .auth
672        .jwt
673        .create_token_pair(&user.id, &user.username, user.role)
674        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Token generation failed: {e}")))?;
675
676    Ok(serde_json::json!({
677        "token": token_pair.access_token,
678        "refresh_token": token_pair.refresh_token,
679        "expires_at": token_pair.expires_at.to_rfc3339(),
680        "user": user.to_public(),
681    }))
682}
683
684async fn handle_auth_logout(
685    _state: &Arc<RwLock<GatewayState>>,
686    _params: &serde_json::Value,
687) -> RpcResult {
688    // For stateless JWT, logout is handled client-side by discarding the token
689    // In a more complete implementation, we'd track refresh token families
690    Ok(serde_json::json!({
691        "success": true,
692    }))
693}
694
695async fn handle_auth_refresh(
696    state: &Arc<RwLock<GatewayState>>,
697    params: &serde_json::Value,
698) -> RpcResult {
699    let refresh_token = params["refresh_token"]
700        .as_str()
701        .ok_or((rpc::INVALID_PARAMS, "Missing refresh_token".to_string()))?;
702
703    let state = state.read().await;
704
705    let token_pair = state
706        .auth
707        .jwt
708        .refresh_tokens(refresh_token)
709        .map_err(|e| (rpc::UNAUTHORIZED, format!("Refresh failed: {e}")))?;
710
711    Ok(serde_json::json!({
712        "token": token_pair.access_token,
713        "refresh_token": token_pair.refresh_token,
714        "expires_at": token_pair.expires_at.to_rfc3339(),
715    }))
716}
717
718async fn handle_auth_me(state: &Arc<RwLock<GatewayState>>, auth_token: Option<&str>) -> RpcResult {
719    let token = auth_token.ok_or((rpc::UNAUTHORIZED, "Not authenticated".to_string()))?;
720
721    let state = state.read().await;
722    let claims = state
723        .auth
724        .validate_token(token)
725        .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
726
727    let user = state
728        .auth
729        .users
730        .get(&claims.sub)
731        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
732        .ok_or((rpc::NOT_FOUND, "User not found".to_string()))?;
733
734    Ok(serde_json::json!({
735        "user": user.to_public(),
736    }))
737}
738
739// ============================================================================
740// Setup RPC Handlers
741// ============================================================================
742
743async fn handle_setup_status(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
744    let state = state.read().await;
745    let bootstrap = state.auth.bootstrap.read().await;
746
747    let base_url = format!("http://{}:{}", state.config.bind_address, state.config.port);
748
749    let status = bootstrap.status(&state.auth.users, Some(&base_url));
750
751    serde_json::to_value(&status)
752        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
753}
754
755async fn handle_setup_init(
756    state: &Arc<RwLock<GatewayState>>,
757    params: &serde_json::Value,
758) -> RpcResult {
759    let bootstrap_token = params["bootstrap_token"]
760        .as_str()
761        .ok_or((rpc::INVALID_PARAMS, "Missing bootstrap_token".to_string()))?;
762    let username = params["admin_username"]
763        .as_str()
764        .ok_or((rpc::INVALID_PARAMS, "Missing admin_username".to_string()))?;
765    let password = params["admin_password"]
766        .as_str()
767        .ok_or((rpc::INVALID_PARAMS, "Missing admin_password".to_string()))?;
768    let email = params["email"].as_str().map(String::from);
769
770    let state = state.read().await;
771    let mut bootstrap = state.auth.bootstrap.write().await;
772
773    let admin = bootstrap
774        .complete_setup(
775            &state.auth.users,
776            bootstrap_token,
777            username,
778            password,
779            email,
780        )
781        .map_err(|e| (rpc::UNAUTHORIZED, format!("Setup failed: {e}")))?;
782
783    // Generate tokens for the new admin
784    let token_pair = state
785        .auth
786        .jwt
787        .create_token_pair(&admin.id, &admin.username, admin.role)
788        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Token generation failed: {e}")))?;
789
790    Ok(serde_json::json!({
791        "token": token_pair.access_token,
792        "refresh_token": token_pair.refresh_token,
793        "expires_at": token_pair.expires_at.to_rfc3339(),
794        "user": admin.to_public(),
795    }))
796}
797
798// ============================================================================
799// User Management RPC Handlers (Admin only)
800// ============================================================================
801
802fn require_admin(state: &GatewayState, token: Option<&str>) -> Result<(), (i32, String)> {
803    let token = token.ok_or((rpc::UNAUTHORIZED, "Not authenticated".to_string()))?;
804    let claims = state
805        .auth
806        .validate_token(token)
807        .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
808
809    if !claims.role.is_admin() {
810        return Err((rpc::FORBIDDEN, "Admin role required".to_string()));
811    }
812
813    Ok(())
814}
815
816async fn handle_users_list(
817    state: &Arc<RwLock<GatewayState>>,
818    auth_token: Option<&str>,
819) -> RpcResult {
820    let state = state.read().await;
821    require_admin(&state, auth_token)?;
822
823    let users = state
824        .auth
825        .users
826        .list()
827        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
828
829    let public_users: Vec<_> = users.iter().map(User::to_public).collect();
830
831    Ok(serde_json::json!({
832        "users": public_users,
833        "total": public_users.len(),
834    }))
835}
836
837async fn handle_users_create(
838    state: &Arc<RwLock<GatewayState>>,
839    params: &serde_json::Value,
840    auth_token: Option<&str>,
841) -> RpcResult {
842    let state = state.read().await;
843    require_admin(&state, auth_token)?;
844
845    let username = params["username"]
846        .as_str()
847        .ok_or((rpc::INVALID_PARAMS, "Missing username".to_string()))?;
848    let password = params["password"]
849        .as_str()
850        .ok_or((rpc::INVALID_PARAMS, "Missing password".to_string()))?;
851    let role_str = params["role"].as_str().unwrap_or("viewer");
852    let email = params["email"].as_str().map(String::from);
853
854    let role: UserRole = role_str
855        .parse()
856        .map_err(|e| (rpc::INVALID_PARAMS, format!("Invalid role: {e}")))?;
857
858    let mut user = User::new(username, password, role)
859        .map_err(|e| (rpc::INTERNAL_ERROR, format!("User creation failed: {e}")))?;
860    user.email = email;
861
862    state
863        .auth
864        .users
865        .create(&user)
866        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
867
868    Ok(serde_json::json!({
869        "user": user.to_public(),
870    }))
871}
872
873async fn handle_users_update(
874    state: &Arc<RwLock<GatewayState>>,
875    params: &serde_json::Value,
876    auth_token: Option<&str>,
877) -> RpcResult {
878    let state = state.read().await;
879    require_admin(&state, auth_token)?;
880
881    let id = params["id"]
882        .as_str()
883        .ok_or((rpc::INVALID_PARAMS, "Missing id".to_string()))?;
884
885    let mut user = state
886        .auth
887        .users
888        .get(id)
889        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
890        .ok_or((rpc::NOT_FOUND, format!("User not found: {id}")))?;
891
892    // Update fields if provided
893    if let Some(role_str) = params["role"].as_str() {
894        user.role = role_str
895            .parse()
896            .map_err(|e| (rpc::INVALID_PARAMS, format!("Invalid role: {e}")))?;
897    }
898
899    if let Some(active) = params["active"].as_bool() {
900        user.active = active;
901    }
902
903    if let Some(email) = params["email"].as_str() {
904        user.email = Some(email.to_string());
905    }
906
907    state
908        .auth
909        .users
910        .update(&user)
911        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
912
913    Ok(serde_json::json!({
914        "user": user.to_public(),
915    }))
916}
917
918async fn handle_users_delete(
919    state: &Arc<RwLock<GatewayState>>,
920    params: &serde_json::Value,
921    auth_token: Option<&str>,
922) -> RpcResult {
923    let state = state.read().await;
924    require_admin(&state, auth_token)?;
925
926    let id = params["id"]
927        .as_str()
928        .ok_or((rpc::INVALID_PARAMS, "Missing id".to_string()))?;
929
930    // Prevent deleting the last admin
931    let users = state
932        .auth
933        .users
934        .list()
935        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
936
937    let admin_count = users
938        .iter()
939        .filter(|u| u.role.is_admin() && u.active)
940        .count();
941    let target_user = users.iter().find(|u| u.id == id);
942
943    if let Some(user) = target_user {
944        if user.role.is_admin() && admin_count <= 1 {
945            return Err((rpc::FORBIDDEN, "Cannot delete the last admin".to_string()));
946        }
947    }
948
949    let deleted = state
950        .auth
951        .users
952        .delete(id)
953        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
954
955    Ok(serde_json::json!({
956        "success": deleted,
957    }))
958}
959
960// ============================================================================
961// System RPC Handlers
962// ============================================================================
963
964async fn handle_system_health(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
965    let state = state.read().await;
966
967    Ok(serde_json::json!({
968        "status": "healthy",
969        "auth_enabled": state.auth.config.enabled,
970        "users_configured": !state.auth.users.is_empty(),
971        "agents_count": state.agents.len(),
972    }))
973}
974
975async fn handle_system_version() -> RpcResult {
976    Ok(serde_json::json!({
977        "version": env!("CARGO_PKG_VERSION"),
978        "name": "openclaw-gateway",
979    }))
980}
981
982/// Handle event subscription request.
983/// This is a WebSocket-only method that returns an ack.
984/// The actual event streaming is handled separately.
985async fn handle_events_subscribe() -> RpcResult {
986    Ok(serde_json::json!({
987        "subscribed": true,
988        "message": "Events will be pushed to this connection",
989    }))
990}
991
992// ============================================================================
993// Session RPC Handlers
994// ============================================================================
995
996async fn handle_session_create(
997    state: &Arc<RwLock<GatewayState>>,
998    params: &serde_json::Value,
999) -> RpcResult {
1000    let agent_id = params["agent_id"].as_str().unwrap_or("default").to_string();
1001    let channel = params["channel"].as_str().unwrap_or("api").to_string();
1002    let peer_id = params["peer_id"]
1003        .as_str()
1004        .unwrap_or("anonymous")
1005        .to_string();
1006
1007    let session_key = SessionKey::build(
1008        &AgentId::new(&agent_id),
1009        &ChannelId::new(&channel),
1010        "gateway",
1011        openclaw_core::types::PeerType::Dm,
1012        &openclaw_core::types::PeerId::new(&peer_id),
1013    );
1014
1015    let event = SessionEvent::new(
1016        session_key.clone(),
1017        agent_id.clone(),
1018        SessionEventKind::SessionStarted {
1019            channel: channel.clone(),
1020            peer_id: peer_id.clone(),
1021        },
1022    );
1023
1024    let state = state.read().await;
1025    state.event_store.append(&event).map_err(|e| {
1026        (
1027            rpc::INTERNAL_ERROR,
1028            format!("Failed to create session: {e}"),
1029        )
1030    })?;
1031
1032    Ok(serde_json::json!({
1033        "session_key": session_key.as_ref(),
1034        "agent_id": agent_id,
1035        "channel": channel,
1036        "peer_id": peer_id,
1037    }))
1038}
1039
1040async fn handle_session_message(
1041    state: &Arc<RwLock<GatewayState>>,
1042    params: &serde_json::Value,
1043) -> RpcResult {
1044    let session_key_str = params["session_key"]
1045        .as_str()
1046        .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1047    let message = params["message"]
1048        .as_str()
1049        .ok_or((rpc::INVALID_PARAMS, "Missing message".to_string()))?;
1050    let agent_id_str = params["agent_id"].as_str().unwrap_or("default");
1051
1052    let session_key = SessionKey::new(session_key_str);
1053    let state = state.read().await;
1054
1055    // Log inbound message
1056    let recv_event = SessionEvent::new(
1057        session_key.clone(),
1058        agent_id_str.to_string(),
1059        SessionEventKind::MessageReceived {
1060            content: message.to_string(),
1061            attachments: vec![],
1062        },
1063    );
1064    state
1065        .event_store
1066        .append(&recv_event)
1067        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to log message: {e}")))?;
1068
1069    // Get agent runtime
1070    let agent = state.agents.get(agent_id_str).ok_or((
1071        rpc::INVALID_PARAMS,
1072        format!("Agent not found: {agent_id_str}"),
1073    ))?;
1074
1075    // Get or create session projection
1076    let projection = state
1077        .event_store
1078        .get_projection(&session_key)
1079        .unwrap_or_else(|_| {
1080            SessionProjection::new(
1081                session_key.clone(),
1082                agent_id_str.to_string(),
1083                ChannelId::new("api"),
1084                "anonymous".to_string(),
1085            )
1086        });
1087
1088    // Build agent context and process
1089    let mut ctx = AgentContext::new(
1090        AgentId::new(agent_id_str),
1091        session_key.clone(),
1092        projection,
1093        state.tool_registry.clone(),
1094    );
1095
1096    let response = agent
1097        .process_message(&mut ctx, message)
1098        .await
1099        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Agent error: {e}")))?;
1100
1101    // Log agent response
1102    let resp_event = SessionEvent::new(
1103        session_key,
1104        agent_id_str.to_string(),
1105        SessionEventKind::AgentResponse {
1106            content: response.clone(),
1107            model: String::new(),
1108            tokens: openclaw_core::types::TokenUsage::default(),
1109        },
1110    );
1111    state
1112        .event_store
1113        .append(&resp_event)
1114        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to log response: {e}")))?;
1115
1116    Ok(serde_json::json!({
1117        "response": response,
1118    }))
1119}
1120
1121async fn handle_session_history(
1122    state: &Arc<RwLock<GatewayState>>,
1123    params: &serde_json::Value,
1124) -> RpcResult {
1125    let session_key_str = params["session_key"]
1126        .as_str()
1127        .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1128
1129    let session_key = SessionKey::new(session_key_str);
1130    let state = state.read().await;
1131
1132    let projection = state
1133        .event_store
1134        .get_projection(&session_key)
1135        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get session: {e}")))?;
1136
1137    serde_json::to_value(&projection)
1138        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
1139}
1140
1141async fn handle_session_end(
1142    state: &Arc<RwLock<GatewayState>>,
1143    params: &serde_json::Value,
1144) -> RpcResult {
1145    let session_key_str = params["session_key"]
1146        .as_str()
1147        .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1148    let reason = params["reason"]
1149        .as_str()
1150        .unwrap_or("user_requested")
1151        .to_string();
1152
1153    let session_key = SessionKey::new(session_key_str);
1154    let state = state.read().await;
1155
1156    let event = SessionEvent::new(
1157        session_key,
1158        "gateway".to_string(),
1159        SessionEventKind::SessionEnded {
1160            reason: reason.clone(),
1161        },
1162    );
1163
1164    state
1165        .event_store
1166        .append(&event)
1167        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to end session: {e}")))?;
1168
1169    Ok(serde_json::json!({
1170        "status": "ended",
1171        "reason": reason,
1172    }))
1173}
1174
1175/// Extended session list with filtering and pagination.
1176async fn handle_session_list(
1177    state: &Arc<RwLock<GatewayState>>,
1178    params: &serde_json::Value,
1179) -> RpcResult {
1180    let limit = params["limit"].as_u64().unwrap_or(50) as usize;
1181    let offset = params["offset"].as_u64().unwrap_or(0) as usize;
1182    let filter_channel = params["channel"].as_str();
1183    let filter_agent = params["agent"].as_str();
1184    let filter_state = params["state"].as_str();
1185
1186    let state = state.read().await;
1187    let session_keys = state
1188        .event_store
1189        .list_sessions()
1190        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1191
1192    // Get projections and apply filters
1193    let mut sessions: Vec<SessionProjection> = session_keys
1194        .into_iter()
1195        .filter_map(|key| state.event_store.get_projection(&key).ok())
1196        .filter(|p| {
1197            // Apply filters
1198            if let Some(ch) = filter_channel {
1199                if p.channel.as_ref() != ch {
1200                    return false;
1201                }
1202            }
1203            if let Some(agent) = filter_agent {
1204                if p.agent_id != agent {
1205                    return false;
1206                }
1207            }
1208            if let Some(st) = filter_state {
1209                let state_match = match st {
1210                    "active" => p.state == SessionState::Active,
1211                    "paused" => p.state == SessionState::Paused,
1212                    "ended" => p.state == SessionState::Ended,
1213                    _ => true,
1214                };
1215                if !state_match {
1216                    return false;
1217                }
1218            }
1219            true
1220        })
1221        .collect();
1222
1223    // Sort by last activity (most recent first)
1224    sessions.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
1225
1226    let total = sessions.len();
1227
1228    // Apply pagination
1229    let sessions: Vec<_> = sessions.into_iter().skip(offset).take(limit).collect();
1230
1231    Ok(serde_json::json!({
1232        "sessions": sessions,
1233        "total": total,
1234        "limit": limit,
1235        "offset": offset,
1236    }))
1237}
1238
1239/// Search sessions by query.
1240async fn handle_session_search(
1241    state: &Arc<RwLock<GatewayState>>,
1242    params: &serde_json::Value,
1243) -> RpcResult {
1244    let query = params["query"]
1245        .as_str()
1246        .ok_or((rpc::INVALID_PARAMS, "Missing query".to_string()))?
1247        .to_lowercase();
1248    let filter_channel = params["channel"].as_str();
1249    let filter_agent = params["agent"].as_str();
1250    let limit = params["limit"].as_u64().unwrap_or(20) as usize;
1251
1252    let state = state.read().await;
1253    let session_keys = state
1254        .event_store
1255        .list_sessions()
1256        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1257
1258    // Search through sessions
1259    let mut results: Vec<SessionProjection> = session_keys
1260        .into_iter()
1261        .filter_map(|key| state.event_store.get_projection(&key).ok())
1262        .filter(|p| {
1263            // Apply channel/agent filters
1264            if let Some(ch) = filter_channel {
1265                if p.channel.as_ref() != ch {
1266                    return false;
1267                }
1268            }
1269            if let Some(agent) = filter_agent {
1270                if p.agent_id != agent {
1271                    return false;
1272                }
1273            }
1274
1275            // Search in peer_id
1276            if p.peer_id.to_lowercase().contains(&query) {
1277                return true;
1278            }
1279
1280            // Search in session key
1281            if p.session_key.as_ref().to_lowercase().contains(&query) {
1282                return true;
1283            }
1284
1285            // Search in message content
1286            for msg in &p.messages {
1287                let content = match msg {
1288                    SessionMessage::Inbound(c) | SessionMessage::Outbound(c) => c,
1289                    SessionMessage::Tool { result, .. } => result,
1290                };
1291                if content.to_lowercase().contains(&query) {
1292                    return true;
1293                }
1294            }
1295
1296            false
1297        })
1298        .collect();
1299
1300    // Sort by relevance (for now, just by last activity)
1301    results.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
1302    results.truncate(limit);
1303
1304    Ok(serde_json::json!({
1305        "sessions": results,
1306        "count": results.len(),
1307        "query": query,
1308    }))
1309}
1310
1311/// Get session statistics.
1312async fn handle_session_stats(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1313    let state = state.read().await;
1314    let session_keys = state
1315        .event_store
1316        .list_sessions()
1317        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1318
1319    let mut total = 0;
1320    let mut active = 0;
1321    let mut by_channel: HashMap<String, u64> = HashMap::new();
1322    let mut by_agent: HashMap<String, u64> = HashMap::new();
1323    let mut total_messages: u64 = 0;
1324
1325    for key in session_keys {
1326        if let Ok(projection) = state.event_store.get_projection(&key) {
1327            total += 1;
1328            if projection.state == SessionState::Active {
1329                active += 1;
1330            }
1331            *by_channel
1332                .entry(projection.channel.as_ref().to_string())
1333                .or_insert(0) += 1;
1334            *by_agent.entry(projection.agent_id.clone()).or_insert(0) += 1;
1335            total_messages += projection.message_count;
1336        }
1337    }
1338
1339    Ok(serde_json::json!({
1340        "total": total,
1341        "active": active,
1342        "by_channel": by_channel,
1343        "by_agent": by_agent,
1344        "total_messages": total_messages,
1345    }))
1346}
1347
1348/// Get events for a session.
1349async fn handle_session_events(
1350    state: &Arc<RwLock<GatewayState>>,
1351    params: &serde_json::Value,
1352) -> RpcResult {
1353    let session_key_str = params["session_key"]
1354        .as_str()
1355        .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1356    let since = params["since"].as_str().and_then(|s| {
1357        chrono::DateTime::parse_from_rfc3339(s)
1358            .ok()
1359            .map(|dt| dt.with_timezone(&Utc))
1360    });
1361
1362    let session_key = SessionKey::new(session_key_str);
1363    let state = state.read().await;
1364
1365    let events = if let Some(since_time) = since {
1366        state
1367            .event_store
1368            .get_events_since(&session_key, since_time)
1369            .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get events: {e}")))?
1370    } else {
1371        state
1372            .event_store
1373            .get_events(&session_key)
1374            .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get events: {e}")))?
1375    };
1376
1377    Ok(serde_json::json!({
1378        "events": events,
1379        "count": events.len(),
1380    }))
1381}
1382
1383// ============================================================================
1384// Channel RPC Handlers
1385// ============================================================================
1386
1387/// Channel info for API responses.
1388#[derive(Debug, Clone, serde::Serialize)]
1389struct ChannelInfo {
1390    id: String,
1391    label: String,
1392    capabilities: ChannelCapabilities,
1393}
1394
1395async fn handle_channels_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1396    let state = state.read().await;
1397    let registry = state.channels.read().await;
1398
1399    let channels: Vec<String> = registry
1400        .list()
1401        .iter()
1402        .map(std::string::ToString::to_string)
1403        .collect();
1404
1405    Ok(serde_json::json!({
1406        "channels": channels,
1407        "count": channels.len(),
1408    }))
1409}
1410
1411async fn handle_channels_status(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1412    let state = state.read().await;
1413    let registry = state.channels.read().await;
1414
1415    let probes = registry.probe_all().await;
1416
1417    let statuses: HashMap<String, serde_json::Value> = probes
1418        .into_iter()
1419        .map(|(id, result)| {
1420            let status = match result {
1421                Ok(probe) => serde_json::json!({
1422                    "connected": probe.connected,
1423                    "account_id": probe.account_id,
1424                    "display_name": probe.display_name,
1425                    "error": probe.error,
1426                }),
1427                Err(e) => serde_json::json!({
1428                    "connected": false,
1429                    "error": e.to_string(),
1430                }),
1431            };
1432            (id, status)
1433        })
1434        .collect();
1435
1436    Ok(serde_json::json!({
1437        "statuses": statuses,
1438    }))
1439}
1440
1441async fn handle_channels_probe(
1442    state: &Arc<RwLock<GatewayState>>,
1443    params: &serde_json::Value,
1444) -> RpcResult {
1445    let channel_id = params["channel_id"]
1446        .as_str()
1447        .ok_or((rpc::INVALID_PARAMS, "Missing channel_id".to_string()))?;
1448
1449    let state = state.read().await;
1450    let registry = state.channels.read().await;
1451
1452    let channel = registry
1453        .get(channel_id)
1454        .ok_or((rpc::NOT_FOUND, format!("Channel not found: {channel_id}")))?;
1455
1456    let probe = channel
1457        .probe()
1458        .await
1459        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Probe failed: {e}")))?;
1460
1461    Ok(serde_json::json!({
1462        "channel_id": channel_id,
1463        "connected": probe.connected,
1464        "account_id": probe.account_id,
1465        "display_name": probe.display_name,
1466        "error": probe.error,
1467    }))
1468}
1469
1470// ============================================================================
1471// Agent RPC Handlers
1472// ============================================================================
1473
1474async fn handle_agent_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1475    let state = state.read().await;
1476    let agents: Vec<&str> = state.agents.keys().map(String::as_str).collect();
1477
1478    Ok(serde_json::json!({
1479        "agents": agents,
1480    }))
1481}
1482
1483async fn handle_agent_get(
1484    state: &Arc<RwLock<GatewayState>>,
1485    params: &serde_json::Value,
1486) -> RpcResult {
1487    let agent_id = params["agent_id"]
1488        .as_str()
1489        .ok_or((rpc::INVALID_PARAMS, "Missing agent_id".to_string()))?;
1490
1491    let state = state.read().await;
1492    let agent = state
1493        .agents
1494        .get(agent_id)
1495        .ok_or((rpc::NOT_FOUND, format!("Agent not found: {agent_id}")))?;
1496
1497    // Return agent info
1498    Ok(serde_json::json!({
1499        "agent_id": agent_id,
1500        "available": true,
1501        "config": {
1502            "model": agent.model(),
1503            "system_prompt": agent.system_prompt(),
1504            "max_tokens": agent.max_tokens(),
1505            "temperature": agent.temperature(),
1506        },
1507    }))
1508}
1509
1510async fn handle_agent_status(
1511    state: &Arc<RwLock<GatewayState>>,
1512    params: &serde_json::Value,
1513) -> RpcResult {
1514    let agent_id = params["agent_id"]
1515        .as_str()
1516        .ok_or((rpc::INVALID_PARAMS, "Missing agent_id".to_string()))?;
1517
1518    let state = state.read().await;
1519    let exists = state.agents.contains_key(agent_id);
1520
1521    Ok(serde_json::json!({
1522        "agent_id": agent_id,
1523        "available": exists,
1524    }))
1525}
1526
1527async fn handle_tools_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1528    let state = state.read().await;
1529    let tools: Vec<serde_json::Value> = state
1530        .tool_registry
1531        .as_tool_definitions()
1532        .iter()
1533        .map(|t| {
1534            serde_json::json!({
1535                "name": t.name,
1536                "description": t.description,
1537                "input_schema": t.input_schema,
1538            })
1539        })
1540        .collect();
1541
1542    Ok(serde_json::json!({
1543        "tools": tools,
1544    }))
1545}
1546
1547async fn handle_tools_execute(
1548    state: &Arc<RwLock<GatewayState>>,
1549    params: &serde_json::Value,
1550) -> RpcResult {
1551    let tool_name = params["tool_name"]
1552        .as_str()
1553        .ok_or((rpc::INVALID_PARAMS, "Missing tool_name".to_string()))?;
1554    let tool_params = params
1555        .get("params")
1556        .cloned()
1557        .unwrap_or(serde_json::json!({}));
1558
1559    let state = state.read().await;
1560    let result = state
1561        .tool_registry
1562        .execute(tool_name, tool_params)
1563        .await
1564        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Tool error: {e}")))?;
1565
1566    serde_json::to_value(&result)
1567        .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
1568}
1569
1570#[cfg(test)]
1571mod tests {
1572    use super::*;
1573
1574    #[test]
1575    fn test_default_config() {
1576        let config = GatewayConfig::default();
1577        assert_eq!(config.port, 18789);
1578        assert_eq!(config.bind_address, "127.0.0.1");
1579    }
1580
1581    #[test]
1582    fn test_builder() {
1583        let temp_dir = std::env::temp_dir().join("openclaw-gateway-test");
1584        let store = Arc::new(EventStore::open(&temp_dir).unwrap());
1585
1586        let gateway = GatewayBuilder::new().with_event_store(store).build();
1587
1588        assert!(gateway.is_ok());
1589    }
1590}