reifydb_client/ws/session/
callback.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{
5	sync::{Arc, Mutex, mpsc},
6	thread,
7	time::Duration,
8};
9
10use crate::{
11	Params,
12	session::{CommandResult, QueryResult},
13	ws::{
14		client::ClientInner,
15		session::{ChannelResponse, ChannelSession, ResponseMessage},
16	},
17};
18
19/// A callback-based session for asynchronous operations
20pub struct CallbackSession {
21	channel_session: Arc<ChannelSession>,
22	receiver: Arc<Mutex<mpsc::Receiver<ResponseMessage>>>,
23	authenticated: Arc<Mutex<bool>>,
24}
25
26impl CallbackSession {
27	/// Create a new callback session
28	pub(crate) fn new(client: Arc<ClientInner>, token: Option<String>) -> Result<Self, reifydb_type::Error> {
29		// Create a channel session and get the receiver
30		let (channel_session, receiver) = ChannelSession::new(client, token.clone())?;
31
32		let channel_session = Arc::new(channel_session);
33		let receiver = Arc::new(Mutex::new(receiver));
34		let authenticated = Arc::new(Mutex::new(false));
35
36		// If token provided, consume the authentication response
37		if token.is_some() {
38			// Try to receive the auth response with a short timeout
39			match receiver.lock().unwrap().recv_timeout(Duration::from_millis(500)) {
40				Ok(msg) => {
41					match msg.response {
42						Ok(ChannelResponse::Auth {
43							..
44						}) => {
45							*authenticated.lock().unwrap() = true;
46							println!("WebSocket Authentication successful");
47						}
48						Err(e) => {
49							// Authentication
50							// failed, but we'll
51							// continue anyway
52							eprintln!(
53								"WebSocket Authentication error (continuing anyway): {}",
54								e
55							);
56							*authenticated.lock().unwrap() = true;
57						}
58						_ => {
59							// Not an auth response
60							// - this shouldn't
61							// happen
62							eprintln!(
63								"Warning: Expected auth response but got: {:?}",
64								msg.response
65							);
66						}
67					}
68				}
69				Err(_) => {
70					// Timeout or disconnected - continue
71					// anyway
72					println!("WebSocket session created with token (no auth response received)");
73					*authenticated.lock().unwrap() = true;
74				}
75			}
76		}
77
78		Ok(Self {
79			channel_session,
80			receiver,
81			authenticated,
82		})
83	}
84
85	/// Send a command with callback
86	pub fn command<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, reifydb_type::Error>
87	where
88		F: FnOnce(Result<CommandResult, reifydb_type::Error>) + Send + 'static,
89	{
90		// Send command through channel session
91		let request_id = self.channel_session.command(rql, params).map_err(|e| {
92			reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
93				"Failed to send command: {}",
94				e
95			)))
96		})?;
97
98		// Spawn thread to wait for response and invoke callback with
99		// timeout
100		let receiver = self.receiver.clone();
101		let request_id_clone = request_id.clone();
102		thread::spawn(move || {
103			// Wait up to 30 seconds for response
104			match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
105				Ok(msg) => {
106					if msg.request_id == request_id_clone {
107						match msg.response {
108							Ok(ChannelResponse::Command {
109								result,
110								..
111							}) => {
112								callback(Ok(result));
113							}
114							Err(e) => {
115								callback(Err(e));
116							}
117							_ => {
118								callback(Err(reifydb_type::Error(
119									reifydb_type::diagnostic::internal(
120										"Unexpected response type for command"
121											.to_string(),
122									),
123								)));
124							}
125						}
126					}
127				}
128				Err(mpsc::RecvTimeoutError::Timeout) => {
129					callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
130						"Command request timeout".to_string(),
131					))));
132				}
133				Err(mpsc::RecvTimeoutError::Disconnected) => {
134					callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
135						"Command channel disconnected".to_string(),
136					))));
137				}
138			}
139		});
140
141		Ok(request_id)
142	}
143
144	/// Send a query with callback
145	pub fn query<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, reifydb_type::Error>
146	where
147		F: FnOnce(Result<QueryResult, reifydb_type::Error>) + Send + 'static,
148	{
149		// Send query through channel session
150		let request_id = self.channel_session.query(rql, params).map_err(|e| {
151			reifydb_type::Error(reifydb_type::diagnostic::internal(format!("Failed to send query: {}", e)))
152		})?;
153
154		// Spawn thread to wait for response and invoke callback with
155		// timeout
156		let receiver = self.receiver.clone();
157		let request_id_clone = request_id.clone();
158		thread::spawn(move || {
159			// Wait up to 30 seconds for response
160			match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
161				Ok(msg) => {
162					if msg.request_id == request_id_clone {
163						match msg.response {
164							Ok(ChannelResponse::Query {
165								result,
166								..
167							}) => {
168								callback(Ok(result));
169							}
170							Err(e) => {
171								callback(Err(e));
172							}
173							_ => {
174								callback(Err(reifydb_type::Error(
175									reifydb_type::diagnostic::internal(
176										"Unexpected response type for query"
177											.to_string(),
178									),
179								)));
180							}
181						}
182					}
183				}
184				Err(mpsc::RecvTimeoutError::Timeout) => {
185					callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
186						"Query request timeout".to_string(),
187					))));
188				}
189				Err(mpsc::RecvTimeoutError::Disconnected) => {
190					callback(Err(reifydb_type::Error(reifydb_type::diagnostic::internal(
191						"Query channel disconnected".to_string(),
192					))));
193				}
194			}
195		});
196
197		Ok(request_id)
198	}
199
200	/// Check if the session is authenticated
201	pub fn is_authenticated(&self) -> bool {
202		*self.authenticated.lock().unwrap()
203	}
204}
205
206impl Drop for CallbackSession {
207	fn drop(&mut self) {
208		// No cleanup needed since we don't have a worker thread anymore
209	}
210}
211
212/// Trait for structured callback handling
213pub trait ResponseHandler: Send {
214	fn on_success(&mut self, result: CommandResult);
215	fn on_error(&mut self, error: String);
216	fn on_timeout(&mut self) {}
217}
218
219/// Trait for query response handling
220pub trait QueryHandler: Send {
221	fn on_success(&mut self, result: QueryResult);
222	fn on_error(&mut self, error: String);
223	fn on_timeout(&mut self) {}
224}
225
226impl CallbackSession {
227	/// Execute command with a response handler
228	pub fn command_with_handler(
229		&self,
230		rql: &str,
231		params: Option<Params>,
232		mut handler: impl ResponseHandler + 'static,
233	) -> Result<String, reifydb_type::Error> {
234		self.command(rql, params, move |result| match result {
235			Ok(data) => handler.on_success(data),
236			Err(e) => handler.on_error(e.to_string()),
237		})
238	}
239
240	/// Execute query with a response handler
241	pub fn query_with_handler(
242		&self,
243		rql: &str,
244		params: Option<Params>,
245		mut handler: impl QueryHandler + 'static,
246	) -> Result<String, reifydb_type::Error> {
247		self.query(rql, params, move |result| match result {
248			Ok(data) => handler.on_success(data),
249			Err(e) => handler.on_error(e.to_string()),
250		})
251	}
252}