middleware_example/
middleware_example.rs1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::{Connector, ConnectorResult, WsStream};
4use binary_options_tools_core_pre::error::CoreResult;
5use binary_options_tools_core_pre::middleware::{MiddlewareContext, WebSocketMiddleware};
6use binary_options_tools_core_pre::traits::{ApiModule, AppState, Rule, RunnerCommand};
7use kanal::{AsyncReceiver, AsyncSender};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio_tungstenite::tungstenite::Message;
12use tracing::info;
13
14#[derive(Debug)]
15struct ExampleState;
16
17#[async_trait]
18impl AppState for ExampleState {
19 async fn clear_temporal_data(&self) {}
20}
21
22struct StatisticsMiddleware {
24 messages_sent: AtomicU64,
25 messages_received: AtomicU64,
26 bytes_sent: AtomicU64,
27 bytes_received: AtomicU64,
28 connections: AtomicU64,
29 disconnections: AtomicU64,
30}
31
32impl StatisticsMiddleware {
33 pub fn new() -> Self {
34 Self {
35 messages_sent: AtomicU64::new(0),
36 messages_received: AtomicU64::new(0),
37 bytes_sent: AtomicU64::new(0),
38 bytes_received: AtomicU64::new(0),
39 connections: AtomicU64::new(0),
40 disconnections: AtomicU64::new(0),
41 }
42 }
43
44 pub fn get_stats(&self) -> StatisticsReport {
45 StatisticsReport {
46 messages_sent: self.messages_sent.load(Ordering::Relaxed),
47 messages_received: self.messages_received.load(Ordering::Relaxed),
48 bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
49 bytes_received: self.bytes_received.load(Ordering::Relaxed),
50 connections: self.connections.load(Ordering::Relaxed),
51 disconnections: self.disconnections.load(Ordering::Relaxed),
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
57pub struct StatisticsReport {
58 pub messages_sent: u64,
59 pub messages_received: u64,
60 pub bytes_sent: u64,
61 pub bytes_received: u64,
62 pub connections: u64,
63 pub disconnections: u64,
64}
65
66#[async_trait]
67impl WebSocketMiddleware<ExampleState> for StatisticsMiddleware {
68 async fn on_send(
69 &self,
70 message: &Message,
71 _context: &MiddlewareContext<ExampleState>,
72 ) -> CoreResult<()> {
73 self.messages_sent.fetch_add(1, Ordering::Relaxed);
74
75 let size = match message {
76 Message::Text(text) => text.len() as u64,
77 Message::Binary(data) => data.len() as u64,
78 _ => 0,
79 };
80 self.bytes_sent.fetch_add(size, Ordering::Relaxed);
81
82 info!("Middleware: Sending message (size: {} bytes)", size);
83 Ok(())
84 }
85
86 async fn on_receive(
87 &self,
88 message: &Message,
89 _context: &MiddlewareContext<ExampleState>,
90 ) -> CoreResult<()> {
91 self.messages_received.fetch_add(1, Ordering::Relaxed);
92
93 let size = match message {
94 Message::Text(text) => text.len() as u64,
95 Message::Binary(data) => data.len() as u64,
96 _ => 0,
97 };
98 self.bytes_received.fetch_add(size, Ordering::Relaxed);
99
100 info!("Middleware: Received message (size: {} bytes)", size);
101 Ok(())
102 }
103
104 async fn on_connect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
105 self.connections.fetch_add(1, Ordering::Relaxed);
106 info!("Middleware: Connected to WebSocket");
107 Ok(())
108 }
109
110 async fn on_disconnect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
111 self.disconnections.fetch_add(1, Ordering::Relaxed);
112 info!("Middleware: Disconnected from WebSocket");
113 Ok(())
114 }
115}
116
117struct LoggingMiddleware;
119
120#[async_trait]
121impl WebSocketMiddleware<ExampleState> for LoggingMiddleware {
122 async fn on_send(
123 &self,
124 message: &Message,
125 _context: &MiddlewareContext<ExampleState>,
126 ) -> CoreResult<()> {
127 info!("Logging: Sending message: {:?}", message);
128 Ok(())
129 }
130
131 async fn on_receive(
132 &self,
133 message: &Message,
134 _context: &MiddlewareContext<ExampleState>,
135 ) -> CoreResult<()> {
136 info!("Logging: Received message: {:?}", message);
137 Ok(())
138 }
139
140 async fn on_connect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
141 info!("Logging: WebSocket connected");
142 Ok(())
143 }
144
145 async fn on_disconnect(&self, _context: &MiddlewareContext<ExampleState>) -> CoreResult<()> {
146 info!("Logging: WebSocket disconnected");
147 Ok(())
148 }
149}
150
151struct MockConnector;
153
154#[async_trait]
155impl Connector<ExampleState> for MockConnector {
156 async fn connect(&self, _: Arc<ExampleState>) -> ConnectorResult<WsStream> {
157 Err(
159 binary_options_tools_core_pre::connector::ConnectorError::Custom(
160 "Mock connector".to_string(),
161 ),
162 )
163 }
164
165 async fn disconnect(&self) -> ConnectorResult<()> {
166 Ok(())
167 }
168}
169
170pub struct ExampleModule {
172 _msg_rx: AsyncReceiver<Arc<Message>>,
173}
174
175#[async_trait]
176impl ApiModule<ExampleState> for ExampleModule {
177 type Command = String;
178 type CommandResponse = String;
179 type Handle = ExampleHandle;
180
181 fn new(
182 _state: Arc<ExampleState>,
183 _cmd_rx: AsyncReceiver<Self::Command>,
184 _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
185 msg_rx: AsyncReceiver<Arc<Message>>,
186 _to_ws: AsyncSender<Message>,
187 _: AsyncSender<RunnerCommand>,
188 ) -> Self {
189 Self { _msg_rx: msg_rx }
190 }
191
192 fn create_handle(
193 sender: AsyncSender<Self::Command>,
194 receiver: AsyncReceiver<Self::CommandResponse>,
195 ) -> Self::Handle {
196 ExampleHandle { sender, receiver }
197 }
198
199 async fn run(&mut self) -> CoreResult<()> {
200 info!("Example module running");
202 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
203 Ok(())
204 }
205
206 fn rule(_: Arc<ExampleState>) -> Box<dyn Rule + Send + Sync> {
207 Box::new(move |_msg: &Message| true)
208 }
209}
210
211#[derive(Clone)]
212#[allow(dead_code)]
213pub struct ExampleHandle {
214 sender: AsyncSender<String>,
215 receiver: AsyncReceiver<String>,
216}
217
218#[tokio::main]
219async fn main() -> CoreResult<()> {
220 tracing_subscriber::fmt::init();
222
223 let stats_middleware = Arc::new(StatisticsMiddleware::new());
225
226 let (client, _) = ClientBuilder::new(MockConnector, ExampleState)
228 .with_middleware(Box::new(LoggingMiddleware))
229 .with_middleware(Box::new(StatisticsMiddleware::new()))
230 .with_module::<ExampleModule>()
231 .build()
232 .await?;
233
234 info!("Client built with middleware layers");
235 tokio::time::sleep(Duration::from_secs(10)).await;
236 client.shutdown().await?;
237 let stats = stats_middleware.get_stats();
244 info!("Current statistics: {:?}", stats);
245
246 Ok(())
247}