reifydb_client/http/session/
callback.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{
5	sync::{Arc, Mutex, mpsc},
6	thread,
7	time::Duration,
8};
9
10use reifydb_type::{Error, diagnostic::internal};
11
12use crate::{
13	Params,
14	http::{
15		client::HttpClient,
16		session::{HttpChannelResponse, HttpChannelSession, HttpResponseMessage},
17	},
18	session::{CommandResult, QueryResult},
19};
20
21/// A callback-based HTTP session for asynchronous operations
22pub struct HttpCallbackSession {
23	channel_session: Arc<HttpChannelSession>,
24	receiver: Arc<Mutex<mpsc::Receiver<HttpResponseMessage>>>,
25	authenticated: Arc<Mutex<bool>>,
26}
27
28impl HttpCallbackSession {
29	/// Create a new callback HTTP session
30	pub fn new(host: &str, port: u16, token: Option<String>) -> Result<Self, Error> {
31		let client = HttpClient::new((host, port)).map_err(|e| {
32			reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
33				"Failed to create client: {}",
34				e
35			)))
36		})?;
37		Self::from_client(client, token)
38	}
39
40	/// Create a new callback HTTP session from an existing client
41	pub fn from_client(client: HttpClient, token: Option<String>) -> Result<Self, Error> {
42		// Create a channel session and get the receiver
43		let (channel_session, receiver) = HttpChannelSession::from_client(client, token.clone())?;
44
45		let channel_session = Arc::new(channel_session);
46		let receiver = Arc::new(Mutex::new(receiver));
47		let authenticated = Arc::new(Mutex::new(false));
48
49		// If token provided, consume the authentication response
50		if token.is_some() {
51			// Try to receive the auth response with a short timeout
52			match receiver.lock().unwrap().recv_timeout(Duration::from_millis(500)) {
53				Ok(msg) => {
54					match msg.response {
55						Ok(HttpChannelResponse::Auth {
56							..
57						}) => {
58							*authenticated.lock().unwrap() = true;
59							println!("HTTP Authentication successful");
60						}
61						Err(e) => {
62							// Authentication failed, but we'll continue anyway
63							eprintln!(
64								"HTTP Authentication error (continuing anyway): {}",
65								e
66							);
67							*authenticated.lock().unwrap() = true;
68						}
69						_ => {
70							// Not an auth response - this shouldn't happen
71							eprintln!(
72								"Warning: Expected auth response but got: {:?}",
73								msg.response
74							);
75						}
76					}
77				}
78				Err(_) => {
79					// Timeout or disconnected - continue
80					// anyway
81					println!("HTTP session created with token (no auth response received)");
82					*authenticated.lock().unwrap() = true;
83				}
84			}
85		}
86
87		Ok(Self {
88			channel_session,
89			receiver,
90			authenticated,
91		})
92	}
93
94	/// Create from URL (e.g., "http://localhost:8080")
95	pub fn from_url(url: &str, token: Option<String>) -> Result<Self, Error> {
96		let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
97		Self::from_client(client, token)
98	}
99
100	/// Set timeout for requests
101	pub fn with_timeout(self, _timeout: Duration) -> Self {
102		// This would need to be implemented at the channel session
103		// level For now, just return self
104		self
105	}
106
107	/// Send a command with callback
108	pub fn command<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, Error>
109	where
110		F: FnOnce(Result<CommandResult, Error>) + Send + 'static,
111	{
112		// Send command through channel session
113		let request_id = self
114			.channel_session
115			.command(rql, params)
116			.map_err(|e| Error(internal(format!("Failed to send command: {}", e))))?;
117
118		// Spawn thread to wait for response and invoke callback with
119		// timeout
120		let receiver = self.receiver.clone();
121		let request_id_clone = request_id.clone();
122		thread::spawn(move || {
123			// Wait up to 30 seconds for response
124			match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
125				Ok(msg) => {
126					if msg.request_id == request_id_clone {
127						match msg.response {
128							Ok(HttpChannelResponse::Command {
129								result,
130								..
131							}) => {
132								callback(Ok(result));
133							}
134							Err(e) => {
135								callback(Err(e));
136							}
137							_ => {
138								callback(Err(Error(internal(
139									"Unexpected response type for command"
140										.to_string(),
141								))));
142							}
143						}
144					}
145				}
146				Err(mpsc::RecvTimeoutError::Timeout) => {
147					callback(Err(Error(internal("Command request timeout".to_string()))));
148				}
149				Err(mpsc::RecvTimeoutError::Disconnected) => {
150					callback(Err(Error(internal("Command channel disconnected".to_string()))));
151				}
152			}
153		});
154
155		Ok(request_id)
156	}
157
158	/// Send a query with callback
159	pub fn query<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, Error>
160	where
161		F: FnOnce(Result<QueryResult, Error>) + Send + 'static,
162	{
163		// Send query through channel session
164		let request_id = self
165			.channel_session
166			.query(rql, params)
167			.map_err(|e| Error(internal(format!("Failed to send query: {}", e))))?;
168
169		// Spawn thread to wait for response and invoke callback with
170		// timeout
171		let receiver = self.receiver.clone();
172		let request_id_clone = request_id.clone();
173		thread::spawn(move || {
174			// Wait up to 30 seconds for response
175			match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
176				Ok(msg) => {
177					if msg.request_id == request_id_clone {
178						match msg.response {
179							Ok(HttpChannelResponse::Query {
180								result,
181								..
182							}) => {
183								callback(Ok(result));
184							}
185							Err(e) => {
186								callback(Err(e));
187							}
188							_ => {
189								callback(Err(Error(internal(
190									"Unexpected response type for query"
191										.to_string(),
192								))));
193							}
194						}
195					}
196				}
197				Err(mpsc::RecvTimeoutError::Timeout) => {
198					callback(Err(Error(internal("Query request timeout".to_string()))));
199				}
200				Err(mpsc::RecvTimeoutError::Disconnected) => {
201					callback(Err(Error(internal("Query channel disconnected".to_string()))));
202				}
203			}
204		});
205
206		Ok(request_id)
207	}
208
209	/// Check if the session is authenticated
210	pub fn is_authenticated(&self) -> bool {
211		*self.authenticated.lock().unwrap()
212	}
213}
214
215/// Trait for structured callback handling (same as WebSocket)
216pub trait HttpResponseHandler: Send {
217	fn on_success(&mut self, result: CommandResult);
218	fn on_error(&mut self, error: String);
219	fn on_timeout(&mut self) {}
220}
221
222/// Trait for query response handling (same as WebSocket)
223pub trait HttpQueryHandler: Send {
224	fn on_success(&mut self, result: QueryResult);
225	fn on_error(&mut self, error: String);
226	fn on_timeout(&mut self) {}
227}
228
229impl HttpCallbackSession {
230	/// Execute command with a response handler
231	pub fn command_with_handler(
232		&self,
233		rql: &str,
234		params: Option<Params>,
235		mut handler: impl HttpResponseHandler + 'static,
236	) -> Result<String, Error> {
237		self.command(rql, params, move |result| match result {
238			Ok(data) => handler.on_success(data),
239			Err(e) => handler.on_error(e.to_string()),
240		})
241	}
242
243	/// Execute query with a response handler
244	pub fn query_with_handler(
245		&self,
246		rql: &str,
247		params: Option<Params>,
248		mut handler: impl HttpQueryHandler + 'static,
249	) -> Result<String, Error> {
250		self.query(rql, params, move |result| match result {
251			Ok(data) => handler.on_success(data),
252			Err(e) => handler.on_error(e.to_string()),
253		})
254	}
255}
256
257impl Drop for HttpCallbackSession {
258	fn drop(&mut self) {
259		// No cleanup needed since we don't have a worker thread anymore
260	}
261}