binary_options_tools_core_pre/
client.rs

1use crate::callback::ConnectionCallback;
2use crate::connector::Connector;
3use crate::error::CoreResult;
4use crate::middleware::{MiddlewareContext, MiddlewareStack};
5use crate::signals::Signals;
6use crate::traits::{ApiModule, AppState, ReconnectCallback, Rule};
7use futures_util::{SinkExt, stream::StreamExt};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::any::{Any, TypeId};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tokio::task::JoinSet;
14use tokio_tungstenite::tungstenite::Message;
15use tracing::{debug, error, info, warn};
16
17/// A lightweight handler is a function that can process messages without being tied to a specific module.
18/// It can be used for quick, non-blocking operations that don't require a full module lifecycle
19/// or state management.
20/// It takes a message, the shared application state, and a sender for outgoing messages.
21/// It returns a future that resolves to a `CoreResult<()>`, indicating success or failure.
22/// This is useful for handling messages that need to be processed quickly or in a lightweight manner,
23/// such as logging, simple transformations, or forwarding messages to other parts of the system.
24pub type LightweightHandler<S> = Box<
25    dyn Fn(
26            Arc<Message>,
27            Arc<S>,
28            &AsyncSender<Message>,
29        ) -> futures_util::future::BoxFuture<'static, CoreResult<()>>
30        + Send
31        + Sync,
32>;
33
34type RuleTp = (Box<dyn Rule + Send + Sync>, AsyncSender<Arc<Message>>);
35// --- Control Commands for the Runner ---
36
37#[derive(Debug)]
38pub enum RunnerCommand {
39    Disconnect,
40    Shutdown, // This can be used to gracefully shut down the runner
41    Connect,
42    Reconnect,
43    // You can add more commands like Shutdown in the future
44}
45
46// --- Internal Router ---
47pub struct Router<S: AppState> {
48    pub(crate) state: Arc<S>,
49    pub(crate) module_rules: Vec<RuleTp>,
50    pub(crate) module_set: JoinSet<()>,
51    pub(crate) lightweight_rules: Vec<RuleTp>,
52    pub(crate) lightweight_handlers: Vec<LightweightHandler<S>>,
53    pub(crate) lightweight_set: JoinSet<()>,
54    pub(crate) middleware_stack: MiddlewareStack<S>,
55}
56
57impl<S: AppState> Router<S> {
58    pub fn new(state: Arc<S>) -> Self {
59        Self {
60            state,
61            module_rules: Vec::new(),
62            module_set: JoinSet::new(),
63            lightweight_rules: Vec::new(),
64            lightweight_handlers: Vec::new(),
65            lightweight_set: JoinSet::new(),
66            middleware_stack: MiddlewareStack::new(),
67        }
68    }
69
70    pub fn spawn_module<F: Future<Output = ()> + Send + 'static>(&mut self, task: F) {
71        self.module_set.spawn(task);
72    }
73
74    pub fn add_module_rule(
75        &mut self,
76        rule: Box<dyn Rule + Send + Sync>,
77        sender: AsyncSender<Arc<Message>>,
78    ) {
79        self.module_rules.push((rule, sender));
80    }
81
82    pub fn add_lightweight_rule(
83        &mut self,
84        rule: Box<dyn Rule + Send + Sync>,
85        sender: AsyncSender<Arc<Message>>,
86    ) {
87        self.lightweight_rules.push((rule, sender));
88    }
89
90    pub fn add_lightweight_handler(&mut self, handler: LightweightHandler<S>) {
91        self.lightweight_handlers.push(handler);
92    }
93
94    pub fn spawn_lightweight_module<F: Future<Output = ()> + Send + 'static>(&mut self, task: F) {
95        self.lightweight_set.spawn(task);
96    }
97
98    /// Routes incoming WebSocket messages to appropriate handlers and modules.
99    ///
100    /// This method implements the core message routing logic with middleware integration:
101    /// 1. **Middleware on_receive**: Called first for all incoming messages
102    /// 2. **Lightweight handlers**: Processed for quick operations
103    /// 3. **Lightweight modules**: Routed based on routing rules
104    /// 4. **API modules**: Routed to matching modules
105    ///
106    /// # Middleware Integration
107    /// The `on_receive` middleware hook is called at the beginning of message processing,
108    /// allowing middleware to observe, log, or transform incoming messages before they
109    /// reach the application logic.
110    ///
111    /// # Arguments
112    /// - `message`: The incoming WebSocket message wrapped in Arc for sharing
113    /// - `sender`: Channel for sending outgoing messages
114    async fn route(&self, message: Arc<Message>, sender: &AsyncSender<Message>) -> CoreResult<()> {
115        // Route to all lightweight handlers first
116        debug!(target: "Router", "Routing message: {message:?}");
117
118        // Create middleware context
119        let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), sender.clone());
120
121        // 🎯 MIDDLEWARE HOOK: on_receive - called for ALL incoming messages
122        // This is where middleware can observe, log, or process incoming messages
123        self.middleware_stack
124            .on_receive(&message, &middleware_context)
125            .await;
126
127        for handler in &self.lightweight_handlers {
128            if let Err(err) = handler(Arc::clone(&message), Arc::clone(&self.state), sender).await {
129                error!(target: "Router",
130                     "Lightweight handler error: {err:#?}"
131                );
132            }
133        }
134        for (rule, sender) in &self.lightweight_rules {
135            // If the rule matches, send the message to the lightweight handler
136            if rule.call(&message) && sender.send(message.clone()).await.is_err() {
137                error!(target: "Router", "A lightweight module has shut down and its channel is closed.");
138            }
139        }
140
141        // Route to the first matching API module
142        for (rule, sender) in &self.module_rules {
143            if rule.call(&message) && sender.send(message.clone()).await.is_err() {
144                error!(target: "Router", "A module has shut down and its channel is closed.");
145            }
146        }
147        Ok(())
148    }
149}
150
151// --- The Public-Facing Handle ---
152#[derive(Debug)]
153pub struct Client<S: AppState> {
154    pub signal: Signals,
155    /// The shared application state, which can be used by modules and handlers.
156    pub state: Arc<S>,
157    pub module_handles: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>,
158    pub to_ws_sender: AsyncSender<Message>,
159
160    runner_command_tx: AsyncSender<RunnerCommand>,
161}
162
163impl<S: AppState> Clone for Client<S> {
164    fn clone(&self) -> Self {
165        Self {
166            signal: self.signal.clone(),
167            state: Arc::clone(&self.state),
168            module_handles: Arc::clone(&self.module_handles),
169            runner_command_tx: self.runner_command_tx.clone(),
170            to_ws_sender: self.to_ws_sender.clone(),
171        }
172    }
173}
174
175impl<S: AppState> Client<S> {
176    // In a real implementation, this would be created by the builder.
177    pub fn new(
178        signal: Signals,
179        runner_command_tx: AsyncSender<RunnerCommand>,
180        state: Arc<S>,
181        sender: AsyncSender<Message>,
182    ) -> Self {
183        Self {
184            signal,
185            state,
186            module_handles: Arc::new(RwLock::new(HashMap::new())),
187            runner_command_tx,
188            to_ws_sender: sender,
189        }
190    }
191
192    /// Waits until the client is connected to the WebSocket server.
193    /// This method will block until the connection is established.
194    /// It is useful for ensuring that the client is ready to send and receive messages.
195    pub async fn wait_connected(&self) {
196        self.signal.wait_connected().await
197    }
198
199    /// Checks if the client is connected to the WebSocket server.
200    pub fn is_connected(&self) -> bool {
201        self.signal.is_connected()
202    }
203
204    /// Retrieves a clonable, typed handle to an already-registered module.
205    pub async fn get_handle<M: ApiModule<S>>(&self) -> Option<M::Handle> {
206        let handles = self.module_handles.read().await;
207        handles
208            .get(&TypeId::of::<M>())
209            .and_then(|boxed_handle| boxed_handle.downcast_ref::<M::Handle>())
210            .cloned()
211    }
212
213    /// Commands the runner to disconnect, clear state, and perform a "hard" reconnect.
214    pub async fn disconnect(&self) -> CoreResult<()> {
215        Ok(self
216            .runner_command_tx
217            .send(RunnerCommand::Disconnect)
218            .await?)
219    }
220
221    /// Commands the runner to disconnect, and perform a "soft" reconnect.
222    pub async fn reconnect(&self) -> CoreResult<()> {
223        Ok(self
224            .runner_command_tx
225            .send(RunnerCommand::Reconnect)
226            .await?)
227    }
228
229    /// Commands the runner to shutdown, this action is final as the runner and client will stop working and will be dropped.
230    pub async fn shutdown(self) -> CoreResult<()> {
231        self.runner_command_tx
232            .send(RunnerCommand::Shutdown)
233            .await
234            .inspect_err(|e| {
235                error!(target: "Client", "Failed to send shutdown command: {e}");
236            })?;
237        drop(self);
238        info!(target: "Client", "Runner shutdown command sent.");
239        Ok(())
240    }
241
242    /// Send a message to the WebSocket
243    pub async fn send_message(&self, message: Message) -> CoreResult<()> {
244        self.to_ws_sender.send(message).await.inspect_err(|e| {
245            error!(target: "Client", "Failed to send message to WebSocket: {e}");
246        })?;
247        Ok(())
248    }
249
250    /// Send a text message to the WebSocket
251    pub async fn send_text(&self, text: String) -> CoreResult<()> {
252        self.send_message(Message::text(text)).await
253    }
254
255    /// Send a binary message to the WebSocket
256    pub async fn send_binary(&self, data: Vec<u8>) -> CoreResult<()> {
257        self.send_message(Message::binary(data)).await
258    }
259}
260
261// --- The Background Worker ---
262/// Implementation of the `ClientRunner` for managing WebSocket client connections and session lifecycle.
263///
264/// # Type Parameters
265/// - `S`: The application state type, which must implement the `AppState` trait.
266///
267/// # Methods
268///
269/// ## `new`
270/// Constructs a new `ClientRunner` instance.
271///
272/// ### Arguments
273/// - `connector`: An `Arc` to a type implementing the `Connector` trait, responsible for establishing connections.
274/// - `state`: An `Arc` to the application state.
275/// - `router`: An `Arc` to the message `Router`.
276/// - `to_ws_sender`: An asynchronous sender for outgoing WebSocket messages.
277/// - `to_ws_receiver`: An asynchronous receiver for outgoing WebSocket messages.
278/// - `runner_command_rx`: An asynchronous receiver for runner commands (e.g., disconnect, shutdown).
279/// - `connection_callback`: Callbacks to execute on connect and reconnect events.
280///
281/// ## `run`
282/// Asynchronously runs the main client loop, managing connection cycles, message routing, and command handling.
283///
284/// - Continuously attempts to connect or reconnect to the WebSocket server until a shutdown is requested.
285/// - On successful connection, executes the appropriate connection callback (`on_connect` or `on_reconnect`).
286/// - Spawns writer and reader tasks for handling outgoing and incoming WebSocket messages.
287/// - Listens for runner commands (e.g., disconnect, shutdown) and manages session state accordingly.
288/// - Handles unexpected connection loss and retries connection as needed.
289/// - Cleans up resources and tasks on disconnect or shutdown.
290///
291/// # Behavior
292/// - Uses a hard connect or reconnect based on the internal state.
293/// - Retries connection attempts with a delay on failure.
294/// - Ensures proper cleanup of tasks and state on disconnect or shutdown.
295/// - Prints status messages for key events and errors.
296pub struct ClientRunner<S: AppState> {
297    /// Notify the client of connection status changes.
298    pub(crate) signal: Signals,
299    pub(crate) connector: Arc<dyn Connector<S>>,
300    pub(crate) router: Arc<Router<S>>,
301    pub(crate) state: Arc<S>,
302    // Flag to determine if the next connection is a fresh one.
303    pub(crate) is_hard_disconnect: bool,
304    // Flag to terminate the main run loop.
305    pub(crate) shutdown_requested: bool,
306
307    pub(crate) connection_callback: ConnectionCallback<S>,
308    pub(crate) to_ws_sender: AsyncSender<Message>,
309    pub(crate) to_ws_receiver: AsyncReceiver<Message>,
310    pub(crate) runner_command_rx: AsyncReceiver<RunnerCommand>,
311}
312
313impl<S: AppState> ClientRunner<S> {
314    /// Main client runner loop that manages WebSocket connections and message processing.
315    ///
316    /// # Middleware Integration Points
317    ///
318    /// This method integrates middleware at four key points:
319    ///
320    /// 1. **Connection Establishment** (`on_connect`): Called after successful connection
321    /// 2. **Message Sending** (`on_send`): Called before each message is sent to WebSocket
322    /// 3. **Message Receiving** (`on_receive`): Called for each incoming message (in Router::route)
323    /// 4. **Disconnection** (`on_disconnect`): Called on manual disconnect, shutdown, or connection loss
324    ///
325    /// # Connection Lifecycle
326    ///
327    /// - **Connection**: Middleware `on_connect` is called after successful WebSocket connection
328    /// - **Active Session**: Middleware `on_send`/`on_receive` called for each message
329    /// - **Disconnection**: Middleware `on_disconnect` called before cleanup
330    pub async fn run(&mut self) {
331        // TODO: Add a way to disconnect and keep the connection closed intill specified otherwhise
332        // The outermost loop runs until a shutdown is commanded.
333        while !self.shutdown_requested {
334            // Execute middleware on_connect hook
335            let middleware_context =
336                MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
337            info!(target: "Runner", "Starting connection cycle...");
338
339            // Call middleware to record connection attempt
340            self.router
341                .middleware_stack
342                .record_connection_attempt(&middleware_context)
343                .await;
344
345            // Use the correct connection method based on the flag.
346            let stream_result = if self.is_hard_disconnect {
347                self.connector.connect(self.state.clone()).await
348            } else {
349                self.connector.reconnect(self.state.clone()).await
350            };
351
352            let ws_stream = match stream_result {
353                Ok(stream) => stream,
354                Err(e) => {
355                    warn!(target: "Runner", "Connection failed: {e}. Retrying in 5s...");
356                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
357                    // On failure, the next attempt is a reconnect, not a hard connect.
358                    self.is_hard_disconnect = false;
359                    continue; // Restart the connection cycle.
360                }
361            };
362
363            // 🎯 MIDDLEWARE HOOK: on_connect - called after successful connection
364            // Location: After WebSocket connection is established
365            info!(target: "Runner", "Connection successful.");
366            self.signal.set_connected();
367            self.router
368                .middleware_stack
369                .on_connect(&middleware_context)
370                .await;
371
372            // Execute the correct callback.
373            if self.is_hard_disconnect {
374                info!(target: "Runner", "Executing on_connect callback.");
375                // Handle any error from on_connect
376                if let Err(err) =
377                    (self.connection_callback.on_connect)(self.state.clone(), &self.to_ws_sender)
378                        .await
379                {
380                    warn!(
381                        target: "Runner",
382                        "on_connect callback failed: {err:#?}"
383                    );
384                }
385            } else {
386                info!(target: "Runner", "Executing on_reconnect callback.");
387                // Handle any error from on_reconnect
388                if let Err(err) = self
389                    .connection_callback
390                    .on_reconnect
391                    .call(self.state.clone(), &self.to_ws_sender)
392                    .await
393                {
394                    warn!(
395                        target: "Runner",
396                        "on_reconnect callback failed: {err:#?}"
397                    );
398                }
399            } // A successful connection means the next one is a "reconnect" unless told otherwise.
400            self.is_hard_disconnect = false;
401
402            let (mut ws_writer, mut ws_reader) = ws_stream.split();
403
404            // 🎯 MIDDLEWARE HOOK: on_send - called in writer task for outgoing messages
405            let writer_task = tokio::spawn({
406                let to_ws_rx = self.to_ws_receiver.clone();
407                let router = Arc::clone(&self.router);
408                let state = Arc::clone(&self.state);
409                let to_ws_sender = self.to_ws_sender.clone();
410                async move {
411                    let middleware_context = MiddlewareContext::new(state, to_ws_sender);
412                    while let Ok(msg) = to_ws_rx.recv().await {
413                        // Execute middleware on_send hook
414                        router
415                            .middleware_stack
416                            .on_send(&msg, &middleware_context)
417                            .await;
418                        if ws_writer.send(msg).await.is_err() {
419                            error!(target: "Runner", "WebSocket writer task failed to send message.");
420                            break;
421                        }
422                    }
423                }
424            });
425
426            let reader_task = tokio::spawn({
427                let to_ws_sender = self.to_ws_sender.clone();
428                let router = Arc::clone(&self.router); // Use Arc for sharing
429                async move {
430                    while let Some(Ok(msg)) = ws_reader.next().await {
431                        if let Err(e) = router.route(Arc::new(msg), &to_ws_sender).await {
432                            warn!(target: "Router", "Error routing message: {:?}", e);
433                        }
434                    }
435                }
436            });
437
438            // --- Active Session Loop ---
439            // This loop runs as long as the connection is stable or no commands are received.
440            let mut writer_task_opt = Some(writer_task);
441            let mut reader_task_opt: Option<tokio::task::JoinHandle<()>> = Some(reader_task);
442
443            let mut session_active = true;
444
445            // Temporal timer so we i can check the duration of a connection
446            // let temporal_timer = std::time::Instant::now();
447            while session_active {
448                tokio::select! {
449                    biased;
450
451                    Ok(cmd) = self.runner_command_rx.recv() => {
452                        match cmd {
453                            RunnerCommand::Disconnect => {
454                                // 🎯 MIDDLEWARE HOOK: on_disconnect - manual disconnect
455
456                                info!(target: "Runner", "Disconnect command received.");
457
458                                // Execute middleware on_disconnect hook
459                                let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
460                                self.router.middleware_stack.on_disconnect(&middleware_context).await;
461
462                                // Call connector's disconnect method to properly close the connection
463                                if let Err(e) = self.connector.disconnect().await {
464                                    warn!(target: "Runner", "Connector disconnect failed: {e}");
465                                }
466
467
468                                self.state.clear_temporal_data().await;
469                                self.is_hard_disconnect = true;
470                                if let Some(writer_task) = writer_task_opt.take() {
471                                    writer_task.abort();
472                                }
473                                if let Some(reader_task) = reader_task_opt.take() {
474                                    reader_task.abort();
475                                }
476                                self.signal.set_disconnected();
477                                session_active = false;
478                            },
479                            RunnerCommand::Shutdown => {
480                                // 🎯 MIDDLEWARE HOOK: on_disconnect - shutdown
481
482                                info!(target: "Runner", "Shutdown command received.");
483
484                                // Execute middleware on_disconnect hook
485                                let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
486                                self.router.middleware_stack.on_disconnect(&middleware_context).await;
487
488                                // Call connector's disconnect method to properly close the connection
489                                if let Err(e) = self.connector.disconnect().await {
490                                    warn!(target: "Runner", "Connector disconnect failed: {e}");
491                                }
492
493                                self.shutdown_requested = true;
494                                if let Some(writer_task) = writer_task_opt.take() {
495                                    writer_task.abort();
496                                }
497                                if let Some(reader_task) = reader_task_opt.take() {
498                                    reader_task.abort();
499                                }
500                                self.signal.set_disconnected();
501                                session_active = false;
502                            }
503                            _ => {}
504                        }
505                    },
506                    _ = async {
507                        if let Some(reader_task) = &mut reader_task_opt {
508                            let _ = reader_task.await;
509                        }
510                    } => {
511                        // 🎯 MIDDLEWARE HOOK: on_disconnect - unexpected connection loss
512                        warn!(target: "Runner", "Connection lost unexpectedly.");
513
514                        // Execute middleware on_disconnect hook
515                        let middleware_context = MiddlewareContext::new(Arc::clone(&self.state), self.to_ws_sender.clone());
516                        self.router.middleware_stack.on_disconnect(&middleware_context).await;
517
518                        if let Some(writer_task) = writer_task_opt.take() {
519                            writer_task.abort();
520                        }
521                        if let Some(reader_task) = reader_task_opt.take() {
522                            // Already finished, but abort for completeness
523                            reader_task.abort();
524                        }
525                        self.signal.set_disconnected();
526                        session_active = false;
527                        // panic!("Connection lost unexpectedly, exiting session loop. Duration: {:?}", temporal_timer.elapsed());
528                    }
529                }
530            }
531        }
532
533        info!(target: "Runner", "Shutdown complete.");
534    }
535}
536
537// A proper builder would be used here to configure and create the Client and ClientRunner