binary_options_tools_core_pre/client.rs
1use crate::callback::ConnectionCallback;
2use crate::connector::Connector;
3use crate::error::CoreResult;
4use crate::middleware::{MiddlewareContext, MiddlewareStack};
5use crate::signals::Signals;
6use crate::traits::{ApiModule, AppState, ReconnectCallback, Rule};
7use futures_util::{SinkExt, stream::StreamExt};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::any::{Any, TypeId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tokio::task::JoinSet;
14use tokio_tungstenite::tungstenite::Message;
15use tracing::{debug, error, info, warn};
16
17/// A lightweight handler is a function that can process messages without being tied to a specific module.
18/// It can be used for quick, non-blocking operations that don't require a full module lifecycle
19/// or state management.
20/// It takes a message, the shared application state, and a sender for outgoing messages.
21/// It returns a future that resolves to a `CoreResult<()>`, indicating success or failure.
22/// This is useful for handling messages that need to be processed quickly or in a lightweight manner,
23/// such as logging, simple transformations, or forwarding messages to other parts of the system.
24pub type LightweightHandler<S> = Box<
25 dyn Fn(
26 Arc<Message>,
27 Arc<S>,
28 &AsyncSender<Message>,
29 ) -> futures_util::future::BoxFuture<'static, CoreResult<()>>
30 + Send
31 + Sync,
32>;
33
34type RuleTp = (Box<dyn Rule + Send + Sync>, AsyncSender<Arc<Message>>);
35// --- Control Commands for the Runner ---
36
37#[derive(Debug)]
38pub enum RunnerCommand {
39 Disconnect,
40 Shutdown, // This can be used to gracefully shut down the runner
41 Connect,
42 Reconnect,
43 // You can add more commands like Shutdown in the future
44}
45
46// --- Internal Router ---
47pub struct Router<S: AppState> {
48 pub(crate) state: Arc<S>,
49 pub(crate) module_rules: Vec<RuleTp>,
50 pub(crate) module_set: JoinSet<()>,
51 pub(crate) lightweight_rules: Vec<RuleTp>,
52 pub(crate) lightweight_handlers: Vec<LightweightHandler<S>>,
53 pub(crate) lightweight_set: JoinSet<()>,
54 pub(crate) middleware_stack: MiddlewareStack<S>,
55}
56
57impl<S: AppState> Router<S> {
58 pub fn new(state: Arc<S>) -> Self {
59 Self {
60 state,
61 module_rules: Vec::new(),
62 module_set: JoinSet::new(),
63 lightweight_rules: Vec::new(),
64 lightweight_handlers: Vec::new(),
65 lightweight_set: JoinSet::new(),
66 middleware_stack: MiddlewareStack::new(),
67 }
68 }
69
70 pub fn spawn_module<F: Future<Output = ()> + Send + 'static>(&mut self, task: F) {
71 self.module_set.spawn(task);
72 }
73
74 pub fn add_module_rule(
75 &mut self,
76 rule: Box<dyn Rule + Send + Sync>,
77 sender: AsyncSender<Arc<Message>>,
78 ) {
79 self.module_rules.push((rule, sender));
80 }
81
82 pub fn add_lightweight_rule(
83 &mut self,
84 rule: Box<dyn Rule + Send + Sync>,
85 sender: AsyncSender<Arc<Message>>,
86 ) {
87 self.lightweight_rules.push((rule, sender));
88 }
89
90 pub fn add_lightweight_handler(&mut self, handler: LightweightHandler<S>) {
91 self.lightweight_handlers.push(handler);
92 }
93
94 pub fn spawn_lightweight_module<F: Future<Output = ()> + Send + 'static>(&mut self, task: F) {
95 self.lightweight_set.spawn(task);
96 }
97
98 /// Routes incoming WebSocket messages to appropriate handlers and modules.
99 ///
100 /// This method implements the core message routing logic with middleware integration:
101 /// 1. **Middleware on_receive**: Called first for all incoming messages
102 /// 2. **Lightweight handlers**: Processed for quick operations
103 /// 3. **Lightweight modules**: Routed based on routing rules
104 /// 4. **API modules**: Routed to matching modules
105 ///
106 /// # Middleware Integration
107 /// The `on_receive` middleware hook is called at the beginning of message processing,
108 /// allowing middleware to observe, log, or transform incoming messages before they
109 /// reach the application logic.
110 ///
111 /// # Arguments
112 /// - `message`: The incoming WebSocket message wrapped in Arc for sharing
113 /// - `sender`: Channel for sending outgoing messages
114 async fn route(&self, message: Arc<Message>, sender: &AsyncSender<Message>) -> CoreResult<()> {
115 // Route to all lightweight handlers first
116 debug!(target: "Router", "Routing message: {message:?}");
117
118 // Create middleware context
119 let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), sender.clone());
120
121 // 🎯 MIDDLEWARE HOOK: on_receive - called for ALL incoming messages
122 // This is where middleware can observe, log, or process incoming messages
123 self.middleware_stack
124 .on_receive(&message, &middleware_context)
125 .await;
126
127 for handler in &self.lightweight_handlers {
128 if let Err(err) = handler(Arc::clone(&message), Arc::clone(&self.state), sender).await {
129 error!(target: "Router",
130 "Lightweight handler error: {err:#?}"
131 );
132 }
133 }
134 for (rule, sender) in &self.lightweight_rules {
135 // If the rule matches, send the message to the lightweight handler
136 if rule.call(&message) && sender.send(message.clone()).await.is_err() {
137 error!(target: "Router", "A lightweight module has shut down and its channel is closed.");
138 }
139 }
140
141 // Route to the first matching API module
142 for (rule, sender) in &self.module_rules {
143 if rule.call(&message) && sender.send(message.clone()).await.is_err() {
144 error!(target: "Router", "A module has shut down and its channel is closed.");
145 }
146 }
147 Ok(())
148 }
149}
150
151// --- The Public-Facing Handle ---
152#[derive(Debug)]
153pub struct Client<S: AppState> {
154 pub signal: Signals,
155 /// The shared application state, which can be used by modules and handlers.
156 pub state: Arc<S>,
157 pub module_handles: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>,
158 pub to_ws_sender: AsyncSender<Message>,
159
160 runner_command_tx: AsyncSender<RunnerCommand>,
161}
162
163impl<S: AppState> Clone for Client<S> {
164 fn clone(&self) -> Self {
165 Self {
166 signal: self.signal.clone(),
167 state: Arc::clone(&self.state),
168 module_handles: Arc::clone(&self.module_handles),
169 runner_command_tx: self.runner_command_tx.clone(),
170 to_ws_sender: self.to_ws_sender.clone(),
171 }
172 }
173}
174
175impl<S: AppState> Client<S> {
176 // In a real implementation, this would be created by the builder.
177 pub fn new(
178 signal: Signals,
179 runner_command_tx: AsyncSender<RunnerCommand>,
180 state: Arc<S>,
181 sender: AsyncSender<Message>,
182 ) -> Self {
183 Self {
184 signal,
185 state,
186 module_handles: Arc::new(RwLock::new(HashMap::new())),
187 runner_command_tx,
188 to_ws_sender: sender,
189 }
190 }
191
192 /// Waits until the client is connected to the WebSocket server.
193 /// This method will block until the connection is established.
194 /// It is useful for ensuring that the client is ready to send and receive messages.
195 pub async fn wait_connected(&self) {
196 self.signal.wait_connected().await
197 }
198
199 /// Checks if the client is connected to the WebSocket server.
200 pub fn is_connected(&self) -> bool {
201 self.signal.is_connected()
202 }
203
204 /// Retrieves a clonable, typed handle to an already-registered module.
205 pub async fn get_handle<M: ApiModule<S>>(&self) -> Option<M::Handle> {
206 let handles = self.module_handles.read().await;
207 handles
208 .get(&TypeId::of::<M>())
209 .and_then(|boxed_handle| boxed_handle.downcast_ref::<M::Handle>())
210 .cloned()
211 }
212
213 /// Commands the runner to disconnect, clear state, and perform a "hard" reconnect.
214 pub async fn disconnect(&self) -> CoreResult<()> {
215 Ok(self
216 .runner_command_tx
217 .send(RunnerCommand::Disconnect)
218 .await?)
219 }
220
221 /// Commands the runner to disconnect, and perform a "soft" reconnect.
222 pub async fn reconnect(&self) -> CoreResult<()> {
223 Ok(self
224 .runner_command_tx
225 .send(RunnerCommand::Reconnect)
226 .await?)
227 }
228
229 /// Commands the runner to shutdown, this action is final as the runner and client will stop working and will be dropped.
230 pub async fn shutdown(self) -> CoreResult<()> {
231 self.runner_command_tx
232 .send(RunnerCommand::Shutdown)
233 .await
234 .inspect_err(|e| {
235 error!(target: "Client", "Failed to send shutdown command: {e}");
236 })?;
237 drop(self);
238 info!(target: "Client", "Runner shutdown command sent.");
239 Ok(())
240 }
241
242 /// Send a message to the WebSocket
243 pub async fn send_message(&self, message: Message) -> CoreResult<()> {
244 self.to_ws_sender.send(message).await.inspect_err(|e| {
245 error!(target: "Client", "Failed to send message to WebSocket: {e}");
246 })?;
247 Ok(())
248 }
249
250 /// Send a text message to the WebSocket
251 pub async fn send_text(&self, text: String) -> CoreResult<()> {
252 self.send_message(Message::text(text)).await
253 }
254
255 /// Send a binary message to the WebSocket
256 pub async fn send_binary(&self, data: Vec<u8>) -> CoreResult<()> {
257 self.send_message(Message::binary(data)).await
258 }
259}
260
261// --- The Background Worker ---
262/// Implementation of the `ClientRunner` for managing WebSocket client connections and session lifecycle.
263///
264/// # Type Parameters
265/// - `S`: The application state type, which must implement the `AppState` trait.
266///
267/// # Methods
268///
269/// ## `new`
270/// Constructs a new `ClientRunner` instance.
271///
272/// ### Arguments
273/// - `connector`: An `Arc` to a type implementing the `Connector` trait, responsible for establishing connections.
274/// - `state`: An `Arc` to the application state.
275/// - `router`: An `Arc` to the message `Router`.
276/// - `to_ws_sender`: An asynchronous sender for outgoing WebSocket messages.
277/// - `to_ws_receiver`: An asynchronous receiver for outgoing WebSocket messages.
278/// - `runner_command_rx`: An asynchronous receiver for runner commands (e.g., disconnect, shutdown).
279/// - `connection_callback`: Callbacks to execute on connect and reconnect events.
280///
281/// ## `run`
282/// Asynchronously runs the main client loop, managing connection cycles, message routing, and command handling.
283///
284/// - Continuously attempts to connect or reconnect to the WebSocket server until a shutdown is requested.
285/// - On successful connection, executes the appropriate connection callback (`on_connect` or `on_reconnect`).
286/// - Spawns writer and reader tasks for handling outgoing and incoming WebSocket messages.
287/// - Listens for runner commands (e.g., disconnect, shutdown) and manages session state accordingly.
288/// - Handles unexpected connection loss and retries connection as needed.
289/// - Cleans up resources and tasks on disconnect or shutdown.
290///
291/// # Behavior
292/// - Uses a hard connect or reconnect based on the internal state.
293/// - Retries connection attempts with a delay on failure.
294/// - Ensures proper cleanup of tasks and state on disconnect or shutdown.
295/// - Prints status messages for key events and errors.
296pub struct ClientRunner<S: AppState> {
297 /// Notify the client of connection status changes.
298 pub(crate) signal: Signals,
299 pub(crate) connector: Arc<dyn Connector<S>>,
300 pub(crate) router: Arc<Router<S>>,
301 pub(crate) state: Arc<S>,
302 // Flag to determine if the next connection is a fresh one.
303 pub(crate) is_hard_disconnect: bool,
304 // Flag to terminate the main run loop.
305 pub(crate) shutdown_requested: bool,
306
307 pub(crate) connection_callback: ConnectionCallback<S>,
308 pub(crate) to_ws_sender: AsyncSender<Message>,
309 pub(crate) to_ws_receiver: AsyncReceiver<Message>,
310 pub(crate) runner_command_rx: AsyncReceiver<RunnerCommand>,
311}
312
313impl<S: AppState> ClientRunner<S> {
314 /// Main client runner loop that manages WebSocket connections and message processing.
315 ///
316 /// # Middleware Integration Points
317 ///
318 /// This method integrates middleware at four key points:
319 ///
320 /// 1. **Connection Establishment** (`on_connect`): Called after successful connection
321 /// 2. **Message Sending** (`on_send`): Called before each message is sent to WebSocket
322 /// 3. **Message Receiving** (`on_receive`): Called for each incoming message (in Router::route)
323 /// 4. **Disconnection** (`on_disconnect`): Called on manual disconnect, shutdown, or connection loss
324 ///
325 /// # Connection Lifecycle
326 ///
327 /// - **Connection**: Middleware `on_connect` is called after successful WebSocket connection
328 /// - **Active Session**: Middleware `on_send`/`on_receive` called for each message
329 /// - **Disconnection**: Middleware `on_disconnect` called before cleanup
330 pub async fn run(&mut self) {
331 // TODO: Add a way to disconnect and keep the connection closed intill specified otherwhise
332 // The outermost loop runs until a shutdown is commanded.
333 while !self.shutdown_requested {
334 // Execute middleware on_connect hook
335 let middleware_context =
336 MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
337 info!(target: "Runner", "Starting connection cycle...");
338
339 // Call middleware to record connection attempt
340 self.router
341 .middleware_stack
342 .record_connection_attempt(&middleware_context)
343 .await;
344
345 // Use the correct connection method based on the flag.
346 let stream_result = if self.is_hard_disconnect {
347 self.connector.connect(self.state.clone()).await
348 } else {
349 self.connector.reconnect(self.state.clone()).await
350 };
351
352 let ws_stream = match stream_result {
353 Ok(stream) => stream,
354 Err(e) => {
355 warn!(target: "Runner", "Connection failed: {e}. Retrying in 5s...");
356 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
357 // On failure, the next attempt is a reconnect, not a hard connect.
358 self.is_hard_disconnect = false;
359 continue; // Restart the connection cycle.
360 }
361 };
362
363 // 🎯 MIDDLEWARE HOOK: on_connect - called after successful connection
364 // Location: After WebSocket connection is established
365 info!(target: "Runner", "Connection successful.");
366 self.signal.set_connected();
367 self.router
368 .middleware_stack
369 .on_connect(&middleware_context)
370 .await;
371
372 // Execute the correct callback.
373 if self.is_hard_disconnect {
374 info!(target: "Runner", "Executing on_connect callback.");
375 // Handle any error from on_connect
376 if let Err(err) =
377 (self.connection_callback.on_connect)(self.state.clone(), &self.to_ws_sender)
378 .await
379 {
380 warn!(
381 target: "Runner",
382 "on_connect callback failed: {err:#?}"
383 );
384 }
385 } else {
386 info!(target: "Runner", "Executing on_reconnect callback.");
387 // Handle any error from on_reconnect
388 if let Err(err) = self
389 .connection_callback
390 .on_reconnect
391 .call(self.state.clone(), &self.to_ws_sender)
392 .await
393 {
394 warn!(
395 target: "Runner",
396 "on_reconnect callback failed: {err:#?}"
397 );
398 }
399 } // A successful connection means the next one is a "reconnect" unless told otherwise.
400 self.is_hard_disconnect = false;
401
402 let (mut ws_writer, mut ws_reader) = ws_stream.split();
403
404 // 🎯 MIDDLEWARE HOOK: on_send - called in writer task for outgoing messages
405 let writer_task = tokio::spawn({
406 let to_ws_rx = self.to_ws_receiver.clone();
407 let router = Arc::clone(&self.router);
408 let state = Arc::clone(&self.state);
409 let to_ws_sender = self.to_ws_sender.clone();
410 async move {
411 let middleware_context = MiddlewareContext::new(state, to_ws_sender);
412 while let Ok(msg) = to_ws_rx.recv().await {
413 // Execute middleware on_send hook
414 router
415 .middleware_stack
416 .on_send(&msg, &middleware_context)
417 .await;
418 if ws_writer.send(msg).await.is_err() {
419 error!(target: "Runner", "WebSocket writer task failed to send message.");
420 break;
421 }
422 }
423 }
424 });
425
426 let reader_task = tokio::spawn({
427 let to_ws_sender = self.to_ws_sender.clone();
428 let router = Arc::clone(&self.router); // Use Arc for sharing
429 async move {
430 while let Some(Ok(msg)) = ws_reader.next().await {
431 if let Err(e) = router.route(Arc::new(msg), &to_ws_sender).await {
432 warn!(target: "Router", "Error routing message: {:?}", e);
433 }
434 }
435 }
436 });
437
438 // --- Active Session Loop ---
439 // This loop runs as long as the connection is stable or no commands are received.
440 let mut writer_task_opt = Some(writer_task);
441 let mut reader_task_opt: Option<tokio::task::JoinHandle<()>> = Some(reader_task);
442
443 let mut session_active = true;
444
445 // Temporal timer so we i can check the duration of a connection
446 // let temporal_timer = std::time::Instant::now();
447 while session_active {
448 tokio::select! {
449 biased;
450
451 Ok(cmd) = self.runner_command_rx.recv() => {
452 match cmd {
453 RunnerCommand::Disconnect => {
454 // 🎯 MIDDLEWARE HOOK: on_disconnect - manual disconnect
455
456 info!(target: "Runner", "Disconnect command received.");
457
458 // Execute middleware on_disconnect hook
459 let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
460 self.router.middleware_stack.on_disconnect(&middleware_context).await;
461
462 // Call connector's disconnect method to properly close the connection
463 if let Err(e) = self.connector.disconnect().await {
464 warn!(target: "Runner", "Connector disconnect failed: {e}");
465 }
466
467
468 self.state.clear_temporal_data().await;
469 self.is_hard_disconnect = true;
470 if let Some(writer_task) = writer_task_opt.take() {
471 writer_task.abort();
472 }
473 if let Some(reader_task) = reader_task_opt.take() {
474 reader_task.abort();
475 }
476 self.signal.set_disconnected();
477 session_active = false;
478 },
479 RunnerCommand::Shutdown => {
480 // 🎯 MIDDLEWARE HOOK: on_disconnect - shutdown
481
482 info!(target: "Runner", "Shutdown command received.");
483
484 // Execute middleware on_disconnect hook
485 let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
486 self.router.middleware_stack.on_disconnect(&middleware_context).await;
487
488 // Call connector's disconnect method to properly close the connection
489 if let Err(e) = self.connector.disconnect().await {
490 warn!(target: "Runner", "Connector disconnect failed: {e}");
491 }
492
493 self.shutdown_requested = true;
494 if let Some(writer_task) = writer_task_opt.take() {
495 writer_task.abort();
496 }
497 if let Some(reader_task) = reader_task_opt.take() {
498 reader_task.abort();
499 }
500 self.signal.set_disconnected();
501 session_active = false;
502 }
503 _ => {}
504 }
505 },
506 _ = async {
507 if let Some(reader_task) = &mut reader_task_opt {
508 let _ = reader_task.await;
509 }
510 } => {
511 // 🎯 MIDDLEWARE HOOK: on_disconnect - unexpected connection loss
512 warn!(target: "Runner", "Connection lost unexpectedly.");
513
514 // Execute middleware on_disconnect hook
515 let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
516 self.router.middleware_stack.on_disconnect(&middleware_context).await;
517
518 if let Some(writer_task) = writer_task_opt.take() {
519 writer_task.abort();
520 }
521 if let Some(reader_task) = reader_task_opt.take() {
522 // Already finished, but abort for completeness
523 reader_task.abort();
524 }
525 self.signal.set_disconnected();
526 session_active = false;
527 // panic!("Connection lost unexpectedly, exiting session loop. Duration: {:?}", temporal_timer.elapsed());
528 }
529 }
530 }
531 }
532
533 info!(target: "Runner", "Shutdown complete.");
534 }
535}
536
537// A proper builder would be used here to configure and create the Client and ClientRunner