1use std::collections::HashMap;
4use std::net::SocketAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use axum::{
9 Json, Router,
10 extract::State,
11 extract::ws::{Message, WebSocket, WebSocketUpgrade},
12 response::IntoResponse,
13 routing::{get, post},
14};
15use chrono::Utc;
16use futures::{SinkExt, StreamExt};
17use tokio::sync::RwLock;
18
19use openclaw_agents::runtime::{AgentContext, AgentRuntime};
20use openclaw_agents::tools::ToolRegistry;
21use openclaw_channels::{ChannelCapabilities, ChannelRegistry};
22use openclaw_core::events::{
23 EventStore, SessionEvent, SessionEventKind, SessionMessage, SessionProjection, SessionState,
24};
25use openclaw_core::types::{AgentId, ChannelId, SessionKey};
26
27use crate::GatewayError;
28use crate::auth::{AuthConfig, AuthState, JwtManager, User, UserRole, setup::auto_setup_from_env};
29use crate::events::EventBroadcaster;
30use crate::rpc::{self, RpcRequest, RpcResponse};
31
32#[cfg(feature = "ui")]
33use crate::ui_server::UiServerConfig;
34
35#[derive(Debug, Clone)]
37pub struct GatewayConfig {
38 pub port: u16,
40 pub bind_address: String,
42 pub cors: bool,
44 pub data_dir: PathBuf,
46 pub auth: AuthConfig,
48 #[cfg(feature = "ui")]
50 pub ui: Option<UiServerConfig>,
51}
52
53impl Default for GatewayConfig {
54 fn default() -> Self {
55 let data_dir = dirs::data_dir()
56 .unwrap_or_else(|| PathBuf::from("."))
57 .join("openclaw")
58 .join("gateway");
59
60 Self {
61 port: 18789,
62 bind_address: "127.0.0.1".to_string(),
63 cors: true,
64 data_dir,
65 auth: AuthConfig::default(),
66 #[cfg(feature = "ui")]
67 ui: Some(UiServerConfig::default()),
68 }
69 }
70}
71
72pub struct GatewayState {
74 pub event_store: Arc<EventStore>,
76 pub agents: HashMap<String, Arc<AgentRuntime>>,
78 pub tool_registry: Arc<ToolRegistry>,
80 pub auth: Arc<AuthState>,
82 pub channels: Arc<RwLock<ChannelRegistry>>,
84 pub events: EventBroadcaster,
86 pub config: GatewayConfig,
88}
89
90pub struct Gateway {
92 config: GatewayConfig,
93 state: Arc<RwLock<GatewayState>>,
94}
95
96pub struct GatewayBuilder {
98 config: GatewayConfig,
99 event_store: Option<Arc<EventStore>>,
100 agents: HashMap<String, Arc<AgentRuntime>>,
101 tool_registry: Arc<ToolRegistry>,
102 auth_state: Option<Arc<AuthState>>,
103 channel_registry: Option<Arc<RwLock<ChannelRegistry>>>,
104 event_broadcaster: Option<EventBroadcaster>,
105}
106
107impl GatewayBuilder {
108 #[must_use]
110 pub fn new() -> Self {
111 Self {
112 config: GatewayConfig::default(),
113 event_store: None,
114 agents: HashMap::new(),
115 tool_registry: Arc::new(ToolRegistry::new()),
116 auth_state: None,
117 channel_registry: None,
118 event_broadcaster: None,
119 }
120 }
121
122 #[must_use]
124 pub fn with_config(mut self, config: GatewayConfig) -> Self {
125 self.config = config;
126 self
127 }
128
129 #[must_use]
131 pub fn with_event_store(mut self, store: Arc<EventStore>) -> Self {
132 self.event_store = Some(store);
133 self
134 }
135
136 #[must_use]
138 pub fn with_agent(mut self, id: impl Into<String>, runtime: Arc<AgentRuntime>) -> Self {
139 self.agents.insert(id.into(), runtime);
140 self
141 }
142
143 #[must_use]
145 pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
146 self.tool_registry = registry;
147 self
148 }
149
150 #[must_use]
152 pub fn with_auth_state(mut self, auth: Arc<AuthState>) -> Self {
153 self.auth_state = Some(auth);
154 self
155 }
156
157 #[must_use]
159 pub fn with_channel_registry(mut self, registry: Arc<RwLock<ChannelRegistry>>) -> Self {
160 self.channel_registry = Some(registry);
161 self
162 }
163
164 #[must_use]
166 pub fn with_event_broadcaster(mut self, broadcaster: EventBroadcaster) -> Self {
167 self.event_broadcaster = Some(broadcaster);
168 self
169 }
170
171 pub fn build(self) -> Result<Gateway, GatewayError> {
177 let event_store = self
178 .event_store
179 .ok_or_else(|| GatewayError::Config("Event store is required".to_string()))?;
180
181 std::fs::create_dir_all(&self.config.data_dir)
183 .map_err(|e| GatewayError::Config(format!("Failed to create data dir: {e}")))?;
184
185 let auth = if let Some(auth) = self.auth_state {
187 auth
188 } else {
189 let auth_config = self.config.auth.clone().with_env_overrides();
190 Arc::new(
191 AuthState::initialize(auth_config, &self.config.data_dir)
192 .map_err(|e| GatewayError::Config(format!("Auth init failed: {e}")))?,
193 )
194 };
195
196 if let Err(e) = auto_setup_from_env(&auth.users) {
198 tracing::warn!("Auto-setup from env failed: {}", e);
199 }
200
201 let channels = self
203 .channel_registry
204 .unwrap_or_else(|| Arc::new(RwLock::new(ChannelRegistry::new())));
205
206 let events = self.event_broadcaster.unwrap_or_default();
208
209 let state = GatewayState {
210 event_store,
211 agents: self.agents,
212 tool_registry: self.tool_registry,
213 auth,
214 channels,
215 events,
216 config: self.config.clone(),
217 };
218
219 Ok(Gateway {
220 config: self.config,
221 state: Arc::new(RwLock::new(state)),
222 })
223 }
224}
225
226impl Default for GatewayBuilder {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232impl Gateway {
233 pub fn new(config: GatewayConfig) -> Result<Self, GatewayError> {
235 std::fs::create_dir_all(&config.data_dir)
237 .map_err(|e| GatewayError::Config(format!("Failed to create data dir: {e}")))?;
238
239 let event_store = Arc::new(
241 EventStore::open(&config.data_dir.join("events"))
242 .map_err(|e| GatewayError::Server(format!("Failed to open event store: {e}")))?,
243 );
244
245 let auth_config = config.auth.clone().with_env_overrides();
247 let auth = Arc::new(
248 AuthState::initialize(auth_config, &config.data_dir)
249 .map_err(|e| GatewayError::Config(format!("Auth init failed: {e}")))?,
250 );
251
252 if let Err(e) = auto_setup_from_env(&auth.users) {
254 tracing::warn!("Auto-setup from env failed: {}", e);
255 }
256
257 let state = GatewayState {
258 event_store,
259 agents: HashMap::new(),
260 tool_registry: Arc::new(ToolRegistry::new()),
261 auth,
262 channels: Arc::new(RwLock::new(ChannelRegistry::new())),
263 events: EventBroadcaster::new(),
264 config: config.clone(),
265 };
266
267 Ok(Self {
268 config,
269 state: Arc::new(RwLock::new(state)),
270 })
271 }
272
273 pub async fn run(&self) -> Result<(), GatewayError> {
278 let state = self.state.clone();
279
280 {
282 let state_read = state.read().await;
283 let mut bootstrap = state_read.auth.bootstrap.write().await;
284 if let Some(_token) = bootstrap.check_and_generate(&state_read.auth.users) {
285 let base_url = format!("http://{}:{}", self.config.bind_address, self.config.port);
286 bootstrap.print_bootstrap_info(&base_url);
287 }
288 }
289
290 let app = Router::new()
292 .route("/health", get(health_handler))
293 .route("/rpc", post(rpc_handler))
294 .route("/ws", get(ws_handler))
295 .with_state(state);
296
297 let addr: SocketAddr = format!("{}:{}", self.config.bind_address, self.config.port)
298 .parse()
299 .map_err(|e| GatewayError::Config(format!("Invalid address: {e}")))?;
300
301 tracing::info!("Gateway API listening on http://{}", addr);
302
303 let api_listener = tokio::net::TcpListener::bind(addr).await?;
305 let api_handle = tokio::spawn(async move { axum::serve(api_listener, app).await });
306
307 #[cfg(feature = "ui")]
309 let ui_handle = if let Some(ref ui_config) = self.config.ui {
310 if ui_config.enabled {
311 let config = ui_config.clone();
312 Some(tokio::spawn(async move {
313 crate::ui_server::run_ui_server(config).await
314 }))
315 } else {
316 None
317 }
318 } else {
319 None
320 };
321
322 #[cfg(feature = "ui")]
324 {
325 tokio::select! {
326 result = api_handle => {
327 result
328 .map_err(|e| GatewayError::Server(format!("API server panic: {e}")))?
329 .map_err(|e| GatewayError::Server(e.to_string()))?;
330 }
331 result = async {
332 match ui_handle {
333 Some(handle) => handle.await,
334 None => std::future::pending().await,
335 }
336 } => {
337 result
338 .map_err(|e| GatewayError::Server(format!("UI server panic: {e}")))?
339 .map_err(|e| GatewayError::Server(e.to_string()))?;
340 }
341 }
342 }
343
344 #[cfg(not(feature = "ui"))]
345 {
346 api_handle
347 .await
348 .map_err(|e| GatewayError::Server(format!("API server panic: {e}")))?
349 .map_err(|e| GatewayError::Server(e.to_string()))?;
350 }
351
352 Ok(())
353 }
354}
355
356async fn health_handler() -> &'static str {
357 "OK"
358}
359
360async fn rpc_handler(
361 State(state): State<Arc<RwLock<GatewayState>>>,
362 headers: axum::http::HeaderMap,
363 Json(request): Json<RpcRequest>,
364) -> Json<RpcResponse> {
365 let id = request.id.clone();
366
367 let auth_token = headers
369 .get(axum::http::header::AUTHORIZATION)
370 .and_then(|v| v.to_str().ok())
371 .and_then(JwtManager::extract_from_header);
372
373 let result = dispatch_rpc(&state, &request.method, &request.params, auth_token).await;
374
375 Json(match result {
376 Ok(value) => RpcResponse::success(id, value),
377 Err((code, message)) => RpcResponse::error(id, code, message),
378 })
379}
380
381#[derive(Debug, serde::Deserialize)]
383struct WsParams {
384 token: Option<String>,
386}
387
388async fn ws_handler(
389 State(state): State<Arc<RwLock<GatewayState>>>,
390 axum::extract::Query(params): axum::extract::Query<WsParams>,
391 ws: WebSocketUpgrade,
392) -> impl IntoResponse {
393 ws.on_upgrade(move |socket| handle_socket(socket, state, params.token))
394}
395
396async fn handle_socket(
397 socket: WebSocket,
398 state: Arc<RwLock<GatewayState>>,
399 auth_token: Option<String>,
400) {
401 let (sender, mut receiver) = socket.split();
402 let sender = Arc::new(tokio::sync::Mutex::new(sender));
403
404 {
406 let state_read = state.read().await;
407 if state_read.auth.config.enabled && state_read.auth.config.require_auth_for_ws {
408 if let Some(token) = &auth_token {
409 if let Err(e) = state_read.auth.validate_token(token) {
410 let error_response =
411 RpcResponse::error(None, rpc::UNAUTHORIZED, format!("Invalid token: {e}"));
412 let response_text = serde_json::to_string(&error_response).unwrap_or_default();
413 let mut guard = sender.lock().await;
414 let _ = guard.send(Message::Text(response_text.into())).await;
415 return;
416 }
417 } else {
418 let error_response = RpcResponse::error(
419 None,
420 rpc::UNAUTHORIZED,
421 "Authentication required".to_string(),
422 );
423 let response_text = serde_json::to_string(&error_response).unwrap_or_default();
424 let mut guard = sender.lock().await;
425 let _ = guard.send(Message::Text(response_text.into())).await;
426 return;
427 }
428 }
429 }
430
431 let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
433 let subscribed = Arc::new(std::sync::atomic::AtomicBool::new(false));
434 let subscribed_clone = subscribed.clone();
435 let sender_clone = sender.clone();
436 let state_clone = state.clone();
437
438 let event_task = tokio::spawn(async move {
440 loop {
442 if subscribed_clone.load(std::sync::atomic::Ordering::Relaxed) {
443 break;
444 }
445 if stop_rx.try_recv().is_ok() {
446 return;
447 }
448 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
449 }
450
451 let mut event_rx = {
453 let state_read = state_clone.read().await;
454 state_read.events.subscribe()
455 };
456
457 loop {
458 tokio::select! {
459 _ = &mut stop_rx => {
460 break;
461 }
462 event_result = event_rx.recv() => {
463 match event_result {
464 Ok(envelope) => {
465 let event_msg = serde_json::json!({
466 "jsonrpc": "2.0",
467 "method": "event",
468 "params": envelope,
469 });
470 let msg_text = serde_json::to_string(&event_msg).unwrap_or_default();
471 let mut guard = sender_clone.lock().await;
472 if guard.send(Message::Text(msg_text.into())).await.is_err() {
473 break;
474 }
475 }
476 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
477 tracing::warn!("Event listener lagged, missed {} events", n);
478 }
479 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
480 break;
481 }
482 }
483 }
484 }
485 }
486 });
487
488 while let Some(msg) = receiver.next().await {
490 let msg = match msg {
491 Ok(Message::Text(text)) => text,
492 Ok(Message::Close(_)) => break,
493 Ok(_) => continue, Err(e) => {
495 tracing::warn!("WebSocket receive error: {}", e);
496 break;
497 }
498 };
499
500 let request: RpcRequest = match serde_json::from_str(&msg) {
502 Ok(req) => req,
503 Err(e) => {
504 let error_response =
505 RpcResponse::error(None, rpc::PARSE_ERROR, format!("Parse error: {e}"));
506 let response_text = serde_json::to_string(&error_response).unwrap_or_default();
507 let mut guard = sender.lock().await;
508 if guard
509 .send(Message::Text(response_text.into()))
510 .await
511 .is_err()
512 {
513 break;
514 }
515 continue;
516 }
517 };
518
519 if request.method == "events.subscribe" {
521 subscribed.store(true, std::sync::atomic::Ordering::Relaxed);
522 }
523
524 let id = request.id.clone();
525 let token_ref = auth_token.as_deref();
526 let result = dispatch_rpc(&state, &request.method, &request.params, token_ref).await;
527
528 let response = match result {
529 Ok(value) => RpcResponse::success(id, value),
530 Err((code, message)) => RpcResponse::error(id, code, message),
531 };
532
533 let response_text = serde_json::to_string(&response).unwrap_or_default();
534 let mut guard = sender.lock().await;
535 if guard
536 .send(Message::Text(response_text.into()))
537 .await
538 .is_err()
539 {
540 break;
541 }
542 }
543
544 let _ = stop_tx.send(());
546 let _ = event_task.await;
547
548 tracing::debug!("WebSocket connection closed");
549}
550
551async fn dispatch_rpc(
553 state: &Arc<RwLock<GatewayState>>,
554 method: &str,
555 params: &serde_json::Value,
556 auth_token: Option<&str>,
557) -> RpcResult {
558 let state_read = state.read().await;
559
560 if state_read.auth.requires_auth(method) {
562 let token =
563 auth_token.ok_or_else(|| (rpc::UNAUTHORIZED, "Authentication required".to_string()))?;
564
565 state_read
566 .auth
567 .validate_token(token)
568 .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
569 }
570
571 drop(state_read);
572
573 match method {
574 "auth.login" => handle_auth_login(state, params).await,
576 "auth.logout" => handle_auth_logout(state, params).await,
577 "auth.refresh" => handle_auth_refresh(state, params).await,
578 "auth.me" => handle_auth_me(state, auth_token).await,
579
580 "setup.status" => handle_setup_status(state).await,
582 "setup.init" => handle_setup_init(state, params).await,
583
584 "users.list" => handle_users_list(state, auth_token).await,
586 "users.create" => handle_users_create(state, params, auth_token).await,
587 "users.update" => handle_users_update(state, params, auth_token).await,
588 "users.delete" => handle_users_delete(state, params, auth_token).await,
589
590 "session.create" => handle_session_create(state, params).await,
592 "session.message" => handle_session_message(state, params).await,
593 "session.history" => handle_session_history(state, params).await,
594 "session.end" => handle_session_end(state, params).await,
595 "session.list" => handle_session_list(state, params).await,
596 "session.search" => handle_session_search(state, params).await,
597 "session.stats" => handle_session_stats(state).await,
598 "session.events" => handle_session_events(state, params).await,
599
600 "channels.list" => handle_channels_list(state).await,
602 "channels.status" => handle_channels_status(state).await,
603 "channels.probe" => handle_channels_probe(state, params).await,
604
605 "agent.list" => handle_agent_list(state).await,
607 "agent.status" => handle_agent_status(state, params).await,
608 "agent.get" => handle_agent_get(state, params).await,
609
610 "tools.list" => handle_tools_list(state).await,
612 "tools.execute" => handle_tools_execute(state, params).await,
613
614 "system.health" => handle_system_health(state).await,
616 "system.version" => handle_system_version().await,
617
618 "events.subscribe" => handle_events_subscribe().await,
620
621 _ => Err((rpc::METHOD_NOT_FOUND, format!("Method not found: {method}"))),
622 }
623}
624
625type RpcResult = Result<serde_json::Value, (i32, String)>;
626
627async fn handle_auth_login(
632 state: &Arc<RwLock<GatewayState>>,
633 params: &serde_json::Value,
634) -> RpcResult {
635 let username = params["username"]
636 .as_str()
637 .ok_or((rpc::INVALID_PARAMS, "Missing username".to_string()))?;
638 let password = params["password"]
639 .as_str()
640 .ok_or((rpc::INVALID_PARAMS, "Missing password".to_string()))?;
641
642 let state = state.read().await;
643
644 let user = state
646 .auth
647 .users
648 .get_by_username(username)
649 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
650 .ok_or((rpc::UNAUTHORIZED, "Invalid credentials".to_string()))?;
651
652 if !user.active {
654 return Err((rpc::UNAUTHORIZED, "Account disabled".to_string()));
655 }
656
657 user.verify_password(password)
659 .map_err(|_| (rpc::UNAUTHORIZED, "Invalid credentials".to_string()))?;
660
661 state.auth.users.update_last_login(&user.id).map_err(|e| {
663 (
664 rpc::INTERNAL_ERROR,
665 format!("Failed to update login time: {e}"),
666 )
667 })?;
668
669 let token_pair = state
671 .auth
672 .jwt
673 .create_token_pair(&user.id, &user.username, user.role)
674 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Token generation failed: {e}")))?;
675
676 Ok(serde_json::json!({
677 "token": token_pair.access_token,
678 "refresh_token": token_pair.refresh_token,
679 "expires_at": token_pair.expires_at.to_rfc3339(),
680 "user": user.to_public(),
681 }))
682}
683
684async fn handle_auth_logout(
685 _state: &Arc<RwLock<GatewayState>>,
686 _params: &serde_json::Value,
687) -> RpcResult {
688 Ok(serde_json::json!({
691 "success": true,
692 }))
693}
694
695async fn handle_auth_refresh(
696 state: &Arc<RwLock<GatewayState>>,
697 params: &serde_json::Value,
698) -> RpcResult {
699 let refresh_token = params["refresh_token"]
700 .as_str()
701 .ok_or((rpc::INVALID_PARAMS, "Missing refresh_token".to_string()))?;
702
703 let state = state.read().await;
704
705 let token_pair = state
706 .auth
707 .jwt
708 .refresh_tokens(refresh_token)
709 .map_err(|e| (rpc::UNAUTHORIZED, format!("Refresh failed: {e}")))?;
710
711 Ok(serde_json::json!({
712 "token": token_pair.access_token,
713 "refresh_token": token_pair.refresh_token,
714 "expires_at": token_pair.expires_at.to_rfc3339(),
715 }))
716}
717
718async fn handle_auth_me(state: &Arc<RwLock<GatewayState>>, auth_token: Option<&str>) -> RpcResult {
719 let token = auth_token.ok_or((rpc::UNAUTHORIZED, "Not authenticated".to_string()))?;
720
721 let state = state.read().await;
722 let claims = state
723 .auth
724 .validate_token(token)
725 .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
726
727 let user = state
728 .auth
729 .users
730 .get(&claims.sub)
731 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
732 .ok_or((rpc::NOT_FOUND, "User not found".to_string()))?;
733
734 Ok(serde_json::json!({
735 "user": user.to_public(),
736 }))
737}
738
739async fn handle_setup_status(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
744 let state = state.read().await;
745 let bootstrap = state.auth.bootstrap.read().await;
746
747 let base_url = format!("http://{}:{}", state.config.bind_address, state.config.port);
748
749 let status = bootstrap.status(&state.auth.users, Some(&base_url));
750
751 serde_json::to_value(&status)
752 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
753}
754
755async fn handle_setup_init(
756 state: &Arc<RwLock<GatewayState>>,
757 params: &serde_json::Value,
758) -> RpcResult {
759 let bootstrap_token = params["bootstrap_token"]
760 .as_str()
761 .ok_or((rpc::INVALID_PARAMS, "Missing bootstrap_token".to_string()))?;
762 let username = params["admin_username"]
763 .as_str()
764 .ok_or((rpc::INVALID_PARAMS, "Missing admin_username".to_string()))?;
765 let password = params["admin_password"]
766 .as_str()
767 .ok_or((rpc::INVALID_PARAMS, "Missing admin_password".to_string()))?;
768 let email = params["email"].as_str().map(String::from);
769
770 let state = state.read().await;
771 let mut bootstrap = state.auth.bootstrap.write().await;
772
773 let admin = bootstrap
774 .complete_setup(
775 &state.auth.users,
776 bootstrap_token,
777 username,
778 password,
779 email,
780 )
781 .map_err(|e| (rpc::UNAUTHORIZED, format!("Setup failed: {e}")))?;
782
783 let token_pair = state
785 .auth
786 .jwt
787 .create_token_pair(&admin.id, &admin.username, admin.role)
788 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Token generation failed: {e}")))?;
789
790 Ok(serde_json::json!({
791 "token": token_pair.access_token,
792 "refresh_token": token_pair.refresh_token,
793 "expires_at": token_pair.expires_at.to_rfc3339(),
794 "user": admin.to_public(),
795 }))
796}
797
798fn require_admin(state: &GatewayState, token: Option<&str>) -> Result<(), (i32, String)> {
803 let token = token.ok_or((rpc::UNAUTHORIZED, "Not authenticated".to_string()))?;
804 let claims = state
805 .auth
806 .validate_token(token)
807 .map_err(|e| (rpc::UNAUTHORIZED, format!("Invalid token: {e}")))?;
808
809 if !claims.role.is_admin() {
810 return Err((rpc::FORBIDDEN, "Admin role required".to_string()));
811 }
812
813 Ok(())
814}
815
816async fn handle_users_list(
817 state: &Arc<RwLock<GatewayState>>,
818 auth_token: Option<&str>,
819) -> RpcResult {
820 let state = state.read().await;
821 require_admin(&state, auth_token)?;
822
823 let users = state
824 .auth
825 .users
826 .list()
827 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
828
829 let public_users: Vec<_> = users.iter().map(User::to_public).collect();
830
831 Ok(serde_json::json!({
832 "users": public_users,
833 "total": public_users.len(),
834 }))
835}
836
837async fn handle_users_create(
838 state: &Arc<RwLock<GatewayState>>,
839 params: &serde_json::Value,
840 auth_token: Option<&str>,
841) -> RpcResult {
842 let state = state.read().await;
843 require_admin(&state, auth_token)?;
844
845 let username = params["username"]
846 .as_str()
847 .ok_or((rpc::INVALID_PARAMS, "Missing username".to_string()))?;
848 let password = params["password"]
849 .as_str()
850 .ok_or((rpc::INVALID_PARAMS, "Missing password".to_string()))?;
851 let role_str = params["role"].as_str().unwrap_or("viewer");
852 let email = params["email"].as_str().map(String::from);
853
854 let role: UserRole = role_str
855 .parse()
856 .map_err(|e| (rpc::INVALID_PARAMS, format!("Invalid role: {e}")))?;
857
858 let mut user = User::new(username, password, role)
859 .map_err(|e| (rpc::INTERNAL_ERROR, format!("User creation failed: {e}")))?;
860 user.email = email;
861
862 state
863 .auth
864 .users
865 .create(&user)
866 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
867
868 Ok(serde_json::json!({
869 "user": user.to_public(),
870 }))
871}
872
873async fn handle_users_update(
874 state: &Arc<RwLock<GatewayState>>,
875 params: &serde_json::Value,
876 auth_token: Option<&str>,
877) -> RpcResult {
878 let state = state.read().await;
879 require_admin(&state, auth_token)?;
880
881 let id = params["id"]
882 .as_str()
883 .ok_or((rpc::INVALID_PARAMS, "Missing id".to_string()))?;
884
885 let mut user = state
886 .auth
887 .users
888 .get(id)
889 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?
890 .ok_or((rpc::NOT_FOUND, format!("User not found: {id}")))?;
891
892 if let Some(role_str) = params["role"].as_str() {
894 user.role = role_str
895 .parse()
896 .map_err(|e| (rpc::INVALID_PARAMS, format!("Invalid role: {e}")))?;
897 }
898
899 if let Some(active) = params["active"].as_bool() {
900 user.active = active;
901 }
902
903 if let Some(email) = params["email"].as_str() {
904 user.email = Some(email.to_string());
905 }
906
907 state
908 .auth
909 .users
910 .update(&user)
911 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
912
913 Ok(serde_json::json!({
914 "user": user.to_public(),
915 }))
916}
917
918async fn handle_users_delete(
919 state: &Arc<RwLock<GatewayState>>,
920 params: &serde_json::Value,
921 auth_token: Option<&str>,
922) -> RpcResult {
923 let state = state.read().await;
924 require_admin(&state, auth_token)?;
925
926 let id = params["id"]
927 .as_str()
928 .ok_or((rpc::INVALID_PARAMS, "Missing id".to_string()))?;
929
930 let users = state
932 .auth
933 .users
934 .list()
935 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
936
937 let admin_count = users
938 .iter()
939 .filter(|u| u.role.is_admin() && u.active)
940 .count();
941 let target_user = users.iter().find(|u| u.id == id);
942
943 if let Some(user) = target_user {
944 if user.role.is_admin() && admin_count <= 1 {
945 return Err((rpc::FORBIDDEN, "Cannot delete the last admin".to_string()));
946 }
947 }
948
949 let deleted = state
950 .auth
951 .users
952 .delete(id)
953 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Storage error: {e}")))?;
954
955 Ok(serde_json::json!({
956 "success": deleted,
957 }))
958}
959
960async fn handle_system_health(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
965 let state = state.read().await;
966
967 Ok(serde_json::json!({
968 "status": "healthy",
969 "auth_enabled": state.auth.config.enabled,
970 "users_configured": !state.auth.users.is_empty(),
971 "agents_count": state.agents.len(),
972 }))
973}
974
975async fn handle_system_version() -> RpcResult {
976 Ok(serde_json::json!({
977 "version": env!("CARGO_PKG_VERSION"),
978 "name": "openclaw-gateway",
979 }))
980}
981
982async fn handle_events_subscribe() -> RpcResult {
986 Ok(serde_json::json!({
987 "subscribed": true,
988 "message": "Events will be pushed to this connection",
989 }))
990}
991
992async fn handle_session_create(
997 state: &Arc<RwLock<GatewayState>>,
998 params: &serde_json::Value,
999) -> RpcResult {
1000 let agent_id = params["agent_id"].as_str().unwrap_or("default").to_string();
1001 let channel = params["channel"].as_str().unwrap_or("api").to_string();
1002 let peer_id = params["peer_id"]
1003 .as_str()
1004 .unwrap_or("anonymous")
1005 .to_string();
1006
1007 let session_key = SessionKey::build(
1008 &AgentId::new(&agent_id),
1009 &ChannelId::new(&channel),
1010 "gateway",
1011 openclaw_core::types::PeerType::Dm,
1012 &openclaw_core::types::PeerId::new(&peer_id),
1013 );
1014
1015 let event = SessionEvent::new(
1016 session_key.clone(),
1017 agent_id.clone(),
1018 SessionEventKind::SessionStarted {
1019 channel: channel.clone(),
1020 peer_id: peer_id.clone(),
1021 },
1022 );
1023
1024 let state = state.read().await;
1025 state.event_store.append(&event).map_err(|e| {
1026 (
1027 rpc::INTERNAL_ERROR,
1028 format!("Failed to create session: {e}"),
1029 )
1030 })?;
1031
1032 Ok(serde_json::json!({
1033 "session_key": session_key.as_ref(),
1034 "agent_id": agent_id,
1035 "channel": channel,
1036 "peer_id": peer_id,
1037 }))
1038}
1039
1040async fn handle_session_message(
1041 state: &Arc<RwLock<GatewayState>>,
1042 params: &serde_json::Value,
1043) -> RpcResult {
1044 let session_key_str = params["session_key"]
1045 .as_str()
1046 .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1047 let message = params["message"]
1048 .as_str()
1049 .ok_or((rpc::INVALID_PARAMS, "Missing message".to_string()))?;
1050 let agent_id_str = params["agent_id"].as_str().unwrap_or("default");
1051
1052 let session_key = SessionKey::new(session_key_str);
1053 let state = state.read().await;
1054
1055 let recv_event = SessionEvent::new(
1057 session_key.clone(),
1058 agent_id_str.to_string(),
1059 SessionEventKind::MessageReceived {
1060 content: message.to_string(),
1061 attachments: vec![],
1062 },
1063 );
1064 state
1065 .event_store
1066 .append(&recv_event)
1067 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to log message: {e}")))?;
1068
1069 let agent = state.agents.get(agent_id_str).ok_or((
1071 rpc::INVALID_PARAMS,
1072 format!("Agent not found: {agent_id_str}"),
1073 ))?;
1074
1075 let projection = state
1077 .event_store
1078 .get_projection(&session_key)
1079 .unwrap_or_else(|_| {
1080 SessionProjection::new(
1081 session_key.clone(),
1082 agent_id_str.to_string(),
1083 ChannelId::new("api"),
1084 "anonymous".to_string(),
1085 )
1086 });
1087
1088 let mut ctx = AgentContext::new(
1090 AgentId::new(agent_id_str),
1091 session_key.clone(),
1092 projection,
1093 state.tool_registry.clone(),
1094 );
1095
1096 let response = agent
1097 .process_message(&mut ctx, message)
1098 .await
1099 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Agent error: {e}")))?;
1100
1101 let resp_event = SessionEvent::new(
1103 session_key,
1104 agent_id_str.to_string(),
1105 SessionEventKind::AgentResponse {
1106 content: response.clone(),
1107 model: String::new(),
1108 tokens: openclaw_core::types::TokenUsage::default(),
1109 },
1110 );
1111 state
1112 .event_store
1113 .append(&resp_event)
1114 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to log response: {e}")))?;
1115
1116 Ok(serde_json::json!({
1117 "response": response,
1118 }))
1119}
1120
1121async fn handle_session_history(
1122 state: &Arc<RwLock<GatewayState>>,
1123 params: &serde_json::Value,
1124) -> RpcResult {
1125 let session_key_str = params["session_key"]
1126 .as_str()
1127 .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1128
1129 let session_key = SessionKey::new(session_key_str);
1130 let state = state.read().await;
1131
1132 let projection = state
1133 .event_store
1134 .get_projection(&session_key)
1135 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get session: {e}")))?;
1136
1137 serde_json::to_value(&projection)
1138 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
1139}
1140
1141async fn handle_session_end(
1142 state: &Arc<RwLock<GatewayState>>,
1143 params: &serde_json::Value,
1144) -> RpcResult {
1145 let session_key_str = params["session_key"]
1146 .as_str()
1147 .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1148 let reason = params["reason"]
1149 .as_str()
1150 .unwrap_or("user_requested")
1151 .to_string();
1152
1153 let session_key = SessionKey::new(session_key_str);
1154 let state = state.read().await;
1155
1156 let event = SessionEvent::new(
1157 session_key,
1158 "gateway".to_string(),
1159 SessionEventKind::SessionEnded {
1160 reason: reason.clone(),
1161 },
1162 );
1163
1164 state
1165 .event_store
1166 .append(&event)
1167 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to end session: {e}")))?;
1168
1169 Ok(serde_json::json!({
1170 "status": "ended",
1171 "reason": reason,
1172 }))
1173}
1174
1175async fn handle_session_list(
1177 state: &Arc<RwLock<GatewayState>>,
1178 params: &serde_json::Value,
1179) -> RpcResult {
1180 let limit = params["limit"].as_u64().unwrap_or(50) as usize;
1181 let offset = params["offset"].as_u64().unwrap_or(0) as usize;
1182 let filter_channel = params["channel"].as_str();
1183 let filter_agent = params["agent"].as_str();
1184 let filter_state = params["state"].as_str();
1185
1186 let state = state.read().await;
1187 let session_keys = state
1188 .event_store
1189 .list_sessions()
1190 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1191
1192 let mut sessions: Vec<SessionProjection> = session_keys
1194 .into_iter()
1195 .filter_map(|key| state.event_store.get_projection(&key).ok())
1196 .filter(|p| {
1197 if let Some(ch) = filter_channel {
1199 if p.channel.as_ref() != ch {
1200 return false;
1201 }
1202 }
1203 if let Some(agent) = filter_agent {
1204 if p.agent_id != agent {
1205 return false;
1206 }
1207 }
1208 if let Some(st) = filter_state {
1209 let state_match = match st {
1210 "active" => p.state == SessionState::Active,
1211 "paused" => p.state == SessionState::Paused,
1212 "ended" => p.state == SessionState::Ended,
1213 _ => true,
1214 };
1215 if !state_match {
1216 return false;
1217 }
1218 }
1219 true
1220 })
1221 .collect();
1222
1223 sessions.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
1225
1226 let total = sessions.len();
1227
1228 let sessions: Vec<_> = sessions.into_iter().skip(offset).take(limit).collect();
1230
1231 Ok(serde_json::json!({
1232 "sessions": sessions,
1233 "total": total,
1234 "limit": limit,
1235 "offset": offset,
1236 }))
1237}
1238
1239async fn handle_session_search(
1241 state: &Arc<RwLock<GatewayState>>,
1242 params: &serde_json::Value,
1243) -> RpcResult {
1244 let query = params["query"]
1245 .as_str()
1246 .ok_or((rpc::INVALID_PARAMS, "Missing query".to_string()))?
1247 .to_lowercase();
1248 let filter_channel = params["channel"].as_str();
1249 let filter_agent = params["agent"].as_str();
1250 let limit = params["limit"].as_u64().unwrap_or(20) as usize;
1251
1252 let state = state.read().await;
1253 let session_keys = state
1254 .event_store
1255 .list_sessions()
1256 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1257
1258 let mut results: Vec<SessionProjection> = session_keys
1260 .into_iter()
1261 .filter_map(|key| state.event_store.get_projection(&key).ok())
1262 .filter(|p| {
1263 if let Some(ch) = filter_channel {
1265 if p.channel.as_ref() != ch {
1266 return false;
1267 }
1268 }
1269 if let Some(agent) = filter_agent {
1270 if p.agent_id != agent {
1271 return false;
1272 }
1273 }
1274
1275 if p.peer_id.to_lowercase().contains(&query) {
1277 return true;
1278 }
1279
1280 if p.session_key.as_ref().to_lowercase().contains(&query) {
1282 return true;
1283 }
1284
1285 for msg in &p.messages {
1287 let content = match msg {
1288 SessionMessage::Inbound(c) | SessionMessage::Outbound(c) => c,
1289 SessionMessage::Tool { result, .. } => result,
1290 };
1291 if content.to_lowercase().contains(&query) {
1292 return true;
1293 }
1294 }
1295
1296 false
1297 })
1298 .collect();
1299
1300 results.sort_by(|a, b| b.last_activity.cmp(&a.last_activity));
1302 results.truncate(limit);
1303
1304 Ok(serde_json::json!({
1305 "sessions": results,
1306 "count": results.len(),
1307 "query": query,
1308 }))
1309}
1310
1311async fn handle_session_stats(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1313 let state = state.read().await;
1314 let session_keys = state
1315 .event_store
1316 .list_sessions()
1317 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to list sessions: {e}")))?;
1318
1319 let mut total = 0;
1320 let mut active = 0;
1321 let mut by_channel: HashMap<String, u64> = HashMap::new();
1322 let mut by_agent: HashMap<String, u64> = HashMap::new();
1323 let mut total_messages: u64 = 0;
1324
1325 for key in session_keys {
1326 if let Ok(projection) = state.event_store.get_projection(&key) {
1327 total += 1;
1328 if projection.state == SessionState::Active {
1329 active += 1;
1330 }
1331 *by_channel
1332 .entry(projection.channel.as_ref().to_string())
1333 .or_insert(0) += 1;
1334 *by_agent.entry(projection.agent_id.clone()).or_insert(0) += 1;
1335 total_messages += projection.message_count;
1336 }
1337 }
1338
1339 Ok(serde_json::json!({
1340 "total": total,
1341 "active": active,
1342 "by_channel": by_channel,
1343 "by_agent": by_agent,
1344 "total_messages": total_messages,
1345 }))
1346}
1347
1348async fn handle_session_events(
1350 state: &Arc<RwLock<GatewayState>>,
1351 params: &serde_json::Value,
1352) -> RpcResult {
1353 let session_key_str = params["session_key"]
1354 .as_str()
1355 .ok_or((rpc::INVALID_PARAMS, "Missing session_key".to_string()))?;
1356 let since = params["since"].as_str().and_then(|s| {
1357 chrono::DateTime::parse_from_rfc3339(s)
1358 .ok()
1359 .map(|dt| dt.with_timezone(&Utc))
1360 });
1361
1362 let session_key = SessionKey::new(session_key_str);
1363 let state = state.read().await;
1364
1365 let events = if let Some(since_time) = since {
1366 state
1367 .event_store
1368 .get_events_since(&session_key, since_time)
1369 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get events: {e}")))?
1370 } else {
1371 state
1372 .event_store
1373 .get_events(&session_key)
1374 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Failed to get events: {e}")))?
1375 };
1376
1377 Ok(serde_json::json!({
1378 "events": events,
1379 "count": events.len(),
1380 }))
1381}
1382
1383#[derive(Debug, Clone, serde::Serialize)]
1389struct ChannelInfo {
1390 id: String,
1391 label: String,
1392 capabilities: ChannelCapabilities,
1393}
1394
1395async fn handle_channels_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1396 let state = state.read().await;
1397 let registry = state.channels.read().await;
1398
1399 let channels: Vec<String> = registry
1400 .list()
1401 .iter()
1402 .map(std::string::ToString::to_string)
1403 .collect();
1404
1405 Ok(serde_json::json!({
1406 "channels": channels,
1407 "count": channels.len(),
1408 }))
1409}
1410
1411async fn handle_channels_status(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1412 let state = state.read().await;
1413 let registry = state.channels.read().await;
1414
1415 let probes = registry.probe_all().await;
1416
1417 let statuses: HashMap<String, serde_json::Value> = probes
1418 .into_iter()
1419 .map(|(id, result)| {
1420 let status = match result {
1421 Ok(probe) => serde_json::json!({
1422 "connected": probe.connected,
1423 "account_id": probe.account_id,
1424 "display_name": probe.display_name,
1425 "error": probe.error,
1426 }),
1427 Err(e) => serde_json::json!({
1428 "connected": false,
1429 "error": e.to_string(),
1430 }),
1431 };
1432 (id, status)
1433 })
1434 .collect();
1435
1436 Ok(serde_json::json!({
1437 "statuses": statuses,
1438 }))
1439}
1440
1441async fn handle_channels_probe(
1442 state: &Arc<RwLock<GatewayState>>,
1443 params: &serde_json::Value,
1444) -> RpcResult {
1445 let channel_id = params["channel_id"]
1446 .as_str()
1447 .ok_or((rpc::INVALID_PARAMS, "Missing channel_id".to_string()))?;
1448
1449 let state = state.read().await;
1450 let registry = state.channels.read().await;
1451
1452 let channel = registry
1453 .get(channel_id)
1454 .ok_or((rpc::NOT_FOUND, format!("Channel not found: {channel_id}")))?;
1455
1456 let probe = channel
1457 .probe()
1458 .await
1459 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Probe failed: {e}")))?;
1460
1461 Ok(serde_json::json!({
1462 "channel_id": channel_id,
1463 "connected": probe.connected,
1464 "account_id": probe.account_id,
1465 "display_name": probe.display_name,
1466 "error": probe.error,
1467 }))
1468}
1469
1470async fn handle_agent_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1475 let state = state.read().await;
1476 let agents: Vec<&str> = state.agents.keys().map(String::as_str).collect();
1477
1478 Ok(serde_json::json!({
1479 "agents": agents,
1480 }))
1481}
1482
1483async fn handle_agent_get(
1484 state: &Arc<RwLock<GatewayState>>,
1485 params: &serde_json::Value,
1486) -> RpcResult {
1487 let agent_id = params["agent_id"]
1488 .as_str()
1489 .ok_or((rpc::INVALID_PARAMS, "Missing agent_id".to_string()))?;
1490
1491 let state = state.read().await;
1492 let agent = state
1493 .agents
1494 .get(agent_id)
1495 .ok_or((rpc::NOT_FOUND, format!("Agent not found: {agent_id}")))?;
1496
1497 Ok(serde_json::json!({
1499 "agent_id": agent_id,
1500 "available": true,
1501 "config": {
1502 "model": agent.model(),
1503 "system_prompt": agent.system_prompt(),
1504 "max_tokens": agent.max_tokens(),
1505 "temperature": agent.temperature(),
1506 },
1507 }))
1508}
1509
1510async fn handle_agent_status(
1511 state: &Arc<RwLock<GatewayState>>,
1512 params: &serde_json::Value,
1513) -> RpcResult {
1514 let agent_id = params["agent_id"]
1515 .as_str()
1516 .ok_or((rpc::INVALID_PARAMS, "Missing agent_id".to_string()))?;
1517
1518 let state = state.read().await;
1519 let exists = state.agents.contains_key(agent_id);
1520
1521 Ok(serde_json::json!({
1522 "agent_id": agent_id,
1523 "available": exists,
1524 }))
1525}
1526
1527async fn handle_tools_list(state: &Arc<RwLock<GatewayState>>) -> RpcResult {
1528 let state = state.read().await;
1529 let tools: Vec<serde_json::Value> = state
1530 .tool_registry
1531 .as_tool_definitions()
1532 .iter()
1533 .map(|t| {
1534 serde_json::json!({
1535 "name": t.name,
1536 "description": t.description,
1537 "input_schema": t.input_schema,
1538 })
1539 })
1540 .collect();
1541
1542 Ok(serde_json::json!({
1543 "tools": tools,
1544 }))
1545}
1546
1547async fn handle_tools_execute(
1548 state: &Arc<RwLock<GatewayState>>,
1549 params: &serde_json::Value,
1550) -> RpcResult {
1551 let tool_name = params["tool_name"]
1552 .as_str()
1553 .ok_or((rpc::INVALID_PARAMS, "Missing tool_name".to_string()))?;
1554 let tool_params = params
1555 .get("params")
1556 .cloned()
1557 .unwrap_or(serde_json::json!({}));
1558
1559 let state = state.read().await;
1560 let result = state
1561 .tool_registry
1562 .execute(tool_name, tool_params)
1563 .await
1564 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Tool error: {e}")))?;
1565
1566 serde_json::to_value(&result)
1567 .map_err(|e| (rpc::INTERNAL_ERROR, format!("Serialization error: {e}")))
1568}
1569
1570#[cfg(test)]
1571mod tests {
1572 use super::*;
1573
1574 #[test]
1575 fn test_default_config() {
1576 let config = GatewayConfig::default();
1577 assert_eq!(config.port, 18789);
1578 assert_eq!(config.bind_address, "127.0.0.1");
1579 }
1580
1581 #[test]
1582 fn test_builder() {
1583 let temp_dir = std::env::temp_dir().join("openclaw-gateway-test");
1584 let store = Arc::new(EventStore::open(&temp_dir).unwrap());
1585
1586 let gateway = GatewayBuilder::new().with_event_store(store).build();
1587
1588 assert!(gateway.is_ok());
1589 }
1590}