Skip to main content

middleware_example/
middleware_example.rs

1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::{Connector, ConnectorResult, WsStream};
4use binary_options_tools_core_pre::error::CoreResult;
5use binary_options_tools_core_pre::middleware::{MiddlewareContext, WebSocketMiddleware};
6use binary_options_tools_core_pre::traits::{ApiModule, AppState, Rule, RunnerCommand};
7use kanal::{AsyncReceiver, AsyncSender};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio_tungstenite::tungstenite::Message;
12use tracing::info;
13
14#[derive(Debug)]
15struct ExampleState;
16
17#[async_trait]
18impl AppState for ExampleState {
19    async fn clear_temporal_data(&self) {}
20}
21
22// Example statistics middleware
23struct StatisticsMiddleware {
24    messages_sent: AtomicU64,
25    messages_received: AtomicU64,
26    bytes_sent: AtomicU64,
27    bytes_received: AtomicU64,
28    connections: AtomicU64,
29    disconnections: AtomicU64,
30}
31
32impl StatisticsMiddleware {
33    pub fn new() -> Self {
34        Self {
35            messages_sent: AtomicU64::new(0),
36            messages_received: AtomicU64::new(0),
37            bytes_sent: AtomicU64::new(0),
38            bytes_received: AtomicU64::new(0),
39            connections: AtomicU64::new(0),
40            disconnections: AtomicU64::new(0),
41        }
42    }
43
44    pub fn get_stats(&self) -> StatisticsReport {
45        StatisticsReport {
46            messages_sent: self.messages_sent.load(Ordering::Relaxed),
47            messages_received: self.messages_received.load(Ordering::Relaxed),
48            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
49            bytes_received: self.bytes_received.load(Ordering::Relaxed),
50            connections: self.connections.load(Ordering::Relaxed),
51            disconnections: self.disconnections.load(Ordering::Relaxed),
52        }
53    }
54}
55
56#[derive(Debug, Clone)]
57pub struct StatisticsReport {
58    pub messages_sent: u64,
59    pub messages_received: u64,
60    pub bytes_sent: u64,
61    pub bytes_received: u64,
62    pub connections: u64,
63    pub disconnections: u64,
64}
65
66#[async_trait]
67impl WebSocketMiddleware<ExampleState> for StatisticsMiddleware {
68    async fn on_send(
69        &self,
70        message: &Message,
71        _context: &MiddlewareContext<ExampleState>,
72    ) -> CoreResult<()> {
73        self.messages_sent.fetch_add(1, Ordering::Relaxed);
74
75        let size = match message {
76            Message::Text(text) => text.len() as u64,
77            Message::Binary(data) => data.len() as u64,
78            _ => 0,
79        };
80        self.bytes_sent.fetch_add(size, Ordering::Relaxed);
81
82        info!("Middleware: Sending message (size: {} bytes)", size);
83        Ok(())
84    }
85
86    async fn on_receive(
87        &self,
88        message: &Message,
89        _context: &MiddlewareContext<ExampleState>,
90    ) -> CoreResult<()> {
91        self.messages_received.fetch_add(1, Ordering::Relaxed);
92
93        let size = match message {
94            Message::Text(text) => text.len() as u64,
95            Message::Binary(data) => data.len() as u64,
96            _ => 0,
97        };
98        self.bytes_received.fetch_add(size, Ordering::Relaxed);
99
100        info!("Middleware: Received message (size: {} bytes)", size);
101        Ok(())
102    }
103
104    async fn on_connect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
105        self.connections.fetch_add(1, Ordering::Relaxed);
106        info!("Middleware: Connected to WebSocket");
107        Ok(())
108    }
109
110    async fn on_disconnect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
111        self.disconnections.fetch_add(1, Ordering::Relaxed);
112        info!("Middleware: Disconnected from WebSocket");
113        Ok(())
114    }
115}
116
117// Example logging middleware
118struct LoggingMiddleware;
119
120#[async_trait]
121impl WebSocketMiddleware<ExampleState> for LoggingMiddleware {
122    async fn on_send(
123        &self,
124        message: &Message,
125        _context: &MiddlewareContext<ExampleState>,
126    ) -> CoreResult<()> {
127        info!("Logging: Sending message: {:?}", message);
128        Ok(())
129    }
130
131    async fn on_receive(
132        &self,
133        message: &Message,
134        _context: &MiddlewareContext<ExampleState>,
135    ) -> CoreResult<()> {
136        info!("Logging: Received message: {:?}", message);
137        Ok(())
138    }
139
140    async fn on_connect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
141        info!("Logging: WebSocket connected");
142        Ok(())
143    }
144
145    async fn on_disconnect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
146        info!("Logging: WebSocket disconnected");
147        Ok(())
148    }
149}
150
151// Mock connector for demonstration
152struct MockConnector;
153
154#[async_trait]
155impl Connector<ExampleState> for MockConnector {
156    async fn connect(&self, _: Arc<ExampleState>) -> ConnectorResult<WsStream> {
157        // This would be a real WebSocket connection in practice
158        Err(
159            binary_options_tools_core_pre::connector::ConnectorError::Custom(
160                "Mock connector".to_string(),
161            ),
162        )
163    }
164
165    async fn disconnect(&self) -> ConnectorResult<()> {
166        Ok(())
167    }
168}
169
170// Example API module
171pub struct ExampleModule {
172    _msg_rx: AsyncReceiver<Arc<Message>>,
173}
174
175#[async_trait]
176impl ApiModule<ExampleState> for ExampleModule {
177    type Command = String;
178    type CommandResponse = String;
179    type Handle = ExampleHandle;
180
181    fn new(
182        _state: Arc<ExampleState>,
183        _cmd_rx: AsyncReceiver<Self::Command>,
184        _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
185        msg_rx: AsyncReceiver<Arc<Message>>,
186        _to_ws: AsyncSender<Message>,
187        _: AsyncSender<RunnerCommand>,
188    ) -> Self {
189        Self { _msg_rx: msg_rx }
190    }
191
192    fn create_handle(
193        sender: AsyncSender<Self::Command>,
194        receiver: AsyncReceiver<Self::CommandResponse>,
195    ) -> Self::Handle {
196        ExampleHandle { sender, receiver }
197    }
198
199    async fn run(&mut self) -> CoreResult<()> {
200        // Example module logic
201        info!("Example module running");
202        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
203        Ok(())
204    }
205
206    fn rule(_: Arc<ExampleState>) -> Box<dyn Rule + Send + Sync> {
207        Box::new(move |_msg: &Message| true)
208    }
209}
210
211#[derive(Clone)]
212#[allow(dead_code)]
213pub struct ExampleHandle {
214    sender: AsyncSender<String>,
215    receiver: AsyncReceiver<String>,
216}
217
218#[tokio::main]
219async fn main() -> CoreResult<()> {
220    // Initialize tracing
221    tracing_subscriber::fmt::init();
222
223    // Create statistics middleware
224    let stats_middleware = Arc::new(StatisticsMiddleware::new());
225
226    // Build the client with middleware
227    let (client, _) = ClientBuilder::new(MockConnector, ExampleState)
228        .with_middleware(Box::new(LoggingMiddleware))
229        .with_middleware(Box::new(StatisticsMiddleware::new()))
230        .with_module::<ExampleModule>()
231        .build()
232        .await?;
233
234    info!("Client built with middleware layers");
235    tokio::time::sleep(Duration::from_secs(10)).await;
236    client.shutdown().await?;
237    // In a real application, you would:
238    // 1. Start the runner in a background task
239    // 2. Use the client to send messages
240    // 3. Check statistics periodically
241
242    // For demonstration, we'll just show the statistics
243    let stats = stats_middleware.get_stats();
244    info!("Current statistics: {:?}", stats);
245
246    Ok(())
247}