reifydb_client/ws/session/
callback.rs1use std::{
5 sync::{Arc, Mutex, mpsc},
6 thread,
7 time::Duration,
8};
9
10use crate::{
11 Params,
12 session::{CommandResult, QueryResult},
13 ws::{
14 client::ClientInner,
15 session::{ChannelResponse, ChannelSession, ResponseMessage},
16 },
17};
18
19pub struct CallbackSession {
21 channel_session: Arc<ChannelSession>,
22 receiver: Arc<Mutex<mpsc::Receiver<ResponseMessage>>>,
23 authenticated: Arc<Mutex<bool>>,
24}
25
26impl CallbackSession {
27 pub(crate) fn new(client: Arc<ClientInner>, token: Option<String>) -> Result<Self, reifydb_type::Error> {
29 let (channel_session, receiver) = ChannelSession::new(client, token.clone())?;
31
32 let channel_session = Arc::new(channel_session);
33 let receiver = Arc::new(Mutex::new(receiver));
34 let authenticated = Arc::new(Mutex::new(false));
35
36 if token.is_some() {
38 match receiver.lock().unwrap().recv_timeout(Duration::from_millis(500)) {
40 Ok(msg) => {
41 match msg.response {
42 Ok(ChannelResponse::Auth {
43 ..
44 }) => {
45 *authenticated.lock().unwrap() = true;
46 println!("WebSocket Authentication successful");
47 }
48 Err(e) => {
49 eprintln!(
53 "WebSocket Authentication error (continuing anyway): {}",
54 e
55 );
56 *authenticated.lock().unwrap() = true;
57 }
58 _ => {
59 eprintln!(
63 "Warning: Expected auth response but got: {:?}",
64 msg.response
65 );
66 }
67 }
68 }
69 Err(_) => {
70 println!("WebSocket session created with token (no auth response received)");
73 *authenticated.lock().unwrap() = true;
74 }
75 }
76 }
77
78 Ok(Self {
79 channel_session,
80 receiver,
81 authenticated,
82 })
83 }
84
85 pub fn command<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, reifydb_type::Error>
87 where
88 F: FnOnce(Result<CommandResult, reifydb_type::Error>) + Send + 'static,
89 {
90 let request_id = self.channel_session.command(rql, params).map_err(|e| {
92 reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
93 "Failed to send command: {}",
94 e
95 )))
96 })?;
97
98 let receiver = self.receiver.clone();
101 let request_id_clone = request_id.clone();
102 thread::spawn(move || {
103 match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
105 Ok(msg) => {
106 if msg.request_id == request_id_clone {
107 match msg.response {
108 Ok(ChannelResponse::Command {
109 result,
110 ..
111 }) => {
112 callback(Ok(result));
113 }
114 Err(e) => {
115 callback(Err(e));
116 }
117 _ => {
118 callback(Err(reifydb_type::Error(
119 reifydb_type::diagnostic::internal(
120 "Unexpected response type for command"
121 .to_string(),
122 ),
123 )));
124 }
125 }
126 }
127 }
128 Err(mpsc::RecvTimeoutError::Timeout) => {
129 callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
130 "Command request timeout".to_string(),
131 ))));
132 }
133 Err(mpsc::RecvTimeoutError::Disconnected) => {
134 callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
135 "Command channel disconnected".to_string(),
136 ))));
137 }
138 }
139 });
140
141 Ok(request_id)
142 }
143
144 pub fn query<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, reifydb_type::Error>
146 where
147 F: FnOnce(Result<QueryResult, reifydb_type::Error>) + Send + 'static,
148 {
149 let request_id = self.channel_session.query(rql, params).map_err(|e| {
151 reifydb_type::Error(reifydb_type::diagnostic::internal(format!("Failed to send query: {}", e)))
152 })?;
153
154 let receiver = self.receiver.clone();
157 let request_id_clone = request_id.clone();
158 thread::spawn(move || {
159 match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
161 Ok(msg) => {
162 if msg.request_id == request_id_clone {
163 match msg.response {
164 Ok(ChannelResponse::Query {
165 result,
166 ..
167 }) => {
168 callback(Ok(result));
169 }
170 Err(e) => {
171 callback(Err(e));
172 }
173 _ => {
174 callback(Err(reifydb_type::Error(
175 reifydb_type::diagnostic::internal(
176 "Unexpected response type for query"
177 .to_string(),
178 ),
179 )));
180 }
181 }
182 }
183 }
184 Err(mpsc::RecvTimeoutError::Timeout) => {
185 callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
186 "Query request timeout".to_string(),
187 ))));
188 }
189 Err(mpsc::RecvTimeoutError::Disconnected) => {
190 callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
191 "Query channel disconnected".to_string(),
192 ))));
193 }
194 }
195 });
196
197 Ok(request_id)
198 }
199
200 pub fn is_authenticated(&self) -> bool {
202 *self.authenticated.lock().unwrap()
203 }
204}
205
206impl Drop for CallbackSession {
207 fn drop(&mut self) {
208 }
210}
211
212pub trait ResponseHandler: Send {
214 fn on_success(&mut self, result: CommandResult);
215 fn on_error(&mut self, error: String);
216 fn on_timeout(&mut self) {}
217}
218
219pub trait QueryHandler: Send {
221 fn on_success(&mut self, result: QueryResult);
222 fn on_error(&mut self, error: String);
223 fn on_timeout(&mut self) {}
224}
225
226impl CallbackSession {
227 pub fn command_with_handler(
229 &self,
230 rql: &str,
231 params: Option<Params>,
232 mut handler: impl ResponseHandler + 'static,
233 ) -> Result<String, reifydb_type::Error> {
234 self.command(rql, params, move |result| match result {
235 Ok(data) => handler.on_success(data),
236 Err(e) => handler.on_error(e.to_string()),
237 })
238 }
239
240 pub fn query_with_handler(
242 &self,
243 rql: &str,
244 params: Option<Params>,
245 mut handler: impl QueryHandler + 'static,
246 ) -> Result<String, reifydb_type::Error> {
247 self.query(rql, params, move |result| match result {
248 Ok(data) => handler.on_success(data),
249 Err(e) => handler.on_error(e.to_string()),
250 })
251 }
252}