reifydb_sub_server/core/
server.rs1use reifydb_engine::StandardEngine;
5use reifydb_sub_api::SchedulerService;
6
7use super::Listener;
8use crate::{
9 config::ServerConfig,
10 protocols::{HttpHandler, ProtocolError, ProtocolHandler, ProtocolResult, WebSocketHandler},
11};
12
13pub struct ProtocolServer {
15 config: ServerConfig,
16 websocket: Option<WebSocketHandler>,
17 http: Option<HttpHandler>,
18 listener: Option<Listener>,
19 engine: StandardEngine,
20 scheduler: SchedulerService,
21}
22
23impl ProtocolServer {
24 pub fn new(config: ServerConfig, engine: StandardEngine, scheduler: SchedulerService) -> Self {
25 Self {
26 config,
27 websocket: None,
28 http: None,
29 listener: None,
30 engine,
31 scheduler,
32 }
33 }
34
35 pub fn with_websocket(&mut self) -> &mut Self {
37 self.websocket = Some(WebSocketHandler::new());
38 self
39 }
40
41 pub fn with_http(&mut self) -> &mut Self {
43 self.http = Some(HttpHandler::new());
44 self
45 }
46
47 pub fn start(&mut self) -> ProtocolResult<()> {
49 if self.listener.is_some() {
50 return Ok(());
51 }
52
53 let enabled_protocols = self.get_enabled_protocols();
54 if enabled_protocols.is_empty() {
55 return Err(ProtocolError::Custom(
56 "No protocols configured. Use with_websocket() or with_http()".to_string(),
57 ));
58 }
59
60 self.listener = Some(Listener::new(
61 self.config.clone(),
62 self.engine.clone(),
63 self.scheduler.clone(),
64 self.websocket.clone(),
65 self.http.clone(),
66 ));
67 Ok(())
68 }
69
70 pub fn stop(&mut self) {
72 if let Some(worker_pool) = self.listener.take() {
73 worker_pool.stop();
74 }
75 }
76
77 pub fn detect_protocol(&self, buffer: &[u8]) -> Option<&str> {
79 if let Some(ref websocket) = self.websocket {
81 if <WebSocketHandler as ProtocolHandler>::can_handle(websocket, buffer) {
82 return Some("ws");
83 }
84 }
85
86 if let Some(ref http) = self.http {
87 if <HttpHandler as ProtocolHandler>::can_handle(http, buffer) {
88 return Some("http");
89 }
90 }
91
92 None
93 }
94
95 pub fn websocket_handler(&self) -> Option<&WebSocketHandler> {
97 self.websocket.as_ref()
98 }
99
100 pub fn http_handler(&self) -> Option<&HttpHandler> {
102 self.http.as_ref()
103 }
104
105 pub fn is_running(&self) -> bool {
107 self.listener.is_some()
108 }
109
110 pub fn config(&self) -> &ServerConfig {
112 &self.config
113 }
114
115 pub fn get_enabled_protocols(&self) -> Vec<String> {
117 let mut protocols = Vec::new();
118
119 if self.websocket.is_some() {
120 protocols.push("WebSocket".to_string());
121 }
122 if self.http.is_some() {
123 protocols.push("HTTP".to_string());
124 }
125
126 protocols
127 }
128
129 pub fn protocols(&self) -> Vec<&str> {
131 let mut protocols = Vec::new();
132
133 if self.websocket.is_some() {
134 protocols.push("ws");
135 }
136 if self.http.is_some() {
137 protocols.push("http");
138 }
139
140 protocols
141 }
142
143 pub fn port(&self) -> Option<u16> {
145 self.listener.as_ref().map(|pool| pool.port())
146 }
147}