reifydb_sub_server/core/
server.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_engine::StandardEngine;
5use reifydb_sub_api::SchedulerService;
6
7use super::Listener;
8use crate::{
9	config::ServerConfig,
10	protocols::{HttpHandler, ProtocolError, ProtocolHandler, ProtocolResult, WebSocketHandler},
11};
12
13/// Multi-protocol server that can handle WebSocket and HTTP protocols
14pub struct ProtocolServer {
15	config: ServerConfig,
16	websocket: Option<WebSocketHandler>,
17	http: Option<HttpHandler>,
18	listener: Option<Listener>,
19	engine: StandardEngine,
20	scheduler: SchedulerService,
21}
22
23impl ProtocolServer {
24	pub fn new(config: ServerConfig, engine: StandardEngine, scheduler: SchedulerService) -> Self {
25		Self {
26			config,
27			websocket: None,
28			http: None,
29			listener: None,
30			engine,
31			scheduler,
32		}
33	}
34
35	/// Add WebSocket protocol support
36	pub fn with_websocket(&mut self) -> &mut Self {
37		self.websocket = Some(WebSocketHandler::new());
38		self
39	}
40
41	/// Add HTTP protocol support
42	pub fn with_http(&mut self) -> &mut Self {
43		self.http = Some(HttpHandler::new());
44		self
45	}
46
47	/// Start the multi-protocol server
48	pub fn start(&mut self) -> ProtocolResult<()> {
49		if self.listener.is_some() {
50			return Ok(());
51		}
52
53		let enabled_protocols = self.get_enabled_protocols();
54		if enabled_protocols.is_empty() {
55			return Err(ProtocolError::Custom(
56				"No protocols configured. Use with_websocket() or with_http()".to_string(),
57			));
58		}
59
60		self.listener = Some(Listener::new(
61			self.config.clone(),
62			self.engine.clone(),
63			self.scheduler.clone(),
64			self.websocket.clone(),
65			self.http.clone(),
66		));
67		Ok(())
68	}
69
70	/// Stop the server
71	pub fn stop(&mut self) {
72		if let Some(worker_pool) = self.listener.take() {
73			worker_pool.stop();
74		}
75	}
76
77	/// Detect which protocol should handle a connection
78	pub fn detect_protocol(&self, buffer: &[u8]) -> Option<&str> {
79		// Check protocols in order of likelihood/preference
80		if let Some(ref websocket) = self.websocket {
81			if <WebSocketHandler as ProtocolHandler>::can_handle(websocket, buffer) {
82				return Some("ws");
83			}
84		}
85
86		if let Some(ref http) = self.http {
87			if <HttpHandler as ProtocolHandler>::can_handle(http, buffer) {
88				return Some("http");
89			}
90		}
91
92		None
93	}
94
95	/// Get WebSocket handler if enabled
96	pub fn websocket_handler(&self) -> Option<&WebSocketHandler> {
97		self.websocket.as_ref()
98	}
99
100	/// Get HTTP handler if enabled
101	pub fn http_handler(&self) -> Option<&HttpHandler> {
102		self.http.as_ref()
103	}
104
105	/// Check if server is running
106	pub fn is_running(&self) -> bool {
107		self.listener.is_some()
108	}
109
110	/// Get server configuration
111	pub fn config(&self) -> &ServerConfig {
112		&self.config
113	}
114
115	/// Get list of enabled protocols
116	pub fn get_enabled_protocols(&self) -> Vec<String> {
117		let mut protocols = Vec::new();
118
119		if self.websocket.is_some() {
120			protocols.push("WebSocket".to_string());
121		}
122		if self.http.is_some() {
123			protocols.push("HTTP".to_string());
124		}
125
126		protocols
127	}
128
129	/// Get list of enabled protocol names
130	pub fn protocols(&self) -> Vec<&str> {
131		let mut protocols = Vec::new();
132
133		if self.websocket.is_some() {
134			protocols.push("ws");
135		}
136		if self.http.is_some() {
137			protocols.push("http");
138		}
139
140		protocols
141	}
142
143	/// Get the actual bound port of the server
144	pub fn port(&self) -> Option<u16> {
145		self.listener.as_ref().map(|pool| pool.port())
146	}
147}