reifydb_client/http/session/
callback.rs1use std::{
5 sync::{Arc, Mutex, mpsc},
6 thread,
7 time::Duration,
8};
9
10use reifydb_type::{Error, diagnostic::internal};
11
12use crate::{
13 Params,
14 http::{
15 client::HttpClient,
16 session::{HttpChannelResponse, HttpChannelSession, HttpResponseMessage},
17 },
18 session::{CommandResult, QueryResult},
19};
20
21pub struct HttpCallbackSession {
23 channel_session: Arc<HttpChannelSession>,
24 receiver: Arc<Mutex<mpsc::Receiver<HttpResponseMessage>>>,
25 authenticated: Arc<Mutex<bool>>,
26}
27
28impl HttpCallbackSession {
29 pub fn new(host: &str, port: u16, token: Option<String>) -> Result<Self, Error> {
31 let client = HttpClient::new((host, port)).map_err(|e| {
32 reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
33 "Failed to create client: {}",
34 e
35 )))
36 })?;
37 Self::from_client(client, token)
38 }
39
40 pub fn from_client(client: HttpClient, token: Option<String>) -> Result<Self, Error> {
42 let (channel_session, receiver) = HttpChannelSession::from_client(client, token.clone())?;
44
45 let channel_session = Arc::new(channel_session);
46 let receiver = Arc::new(Mutex::new(receiver));
47 let authenticated = Arc::new(Mutex::new(false));
48
49 if token.is_some() {
51 match receiver.lock().unwrap().recv_timeout(Duration::from_millis(500)) {
53 Ok(msg) => {
54 match msg.response {
55 Ok(HttpChannelResponse::Auth {
56 ..
57 }) => {
58 *authenticated.lock().unwrap() = true;
59 println!("HTTP Authentication successful");
60 }
61 Err(e) => {
62 eprintln!(
64 "HTTP Authentication error (continuing anyway): {}",
65 e
66 );
67 *authenticated.lock().unwrap() = true;
68 }
69 _ => {
70 eprintln!(
72 "Warning: Expected auth response but got: {:?}",
73 msg.response
74 );
75 }
76 }
77 }
78 Err(_) => {
79 println!("HTTP session created with token (no auth response received)");
82 *authenticated.lock().unwrap() = true;
83 }
84 }
85 }
86
87 Ok(Self {
88 channel_session,
89 receiver,
90 authenticated,
91 })
92 }
93
94 pub fn from_url(url: &str, token: Option<String>) -> Result<Self, Error> {
96 let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
97 Self::from_client(client, token)
98 }
99
100 pub fn with_timeout(self, _timeout: Duration) -> Self {
102 self
105 }
106
107 pub fn command<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, Error>
109 where
110 F: FnOnce(Result<CommandResult, Error>) + Send + 'static,
111 {
112 let request_id = self
114 .channel_session
115 .command(rql, params)
116 .map_err(|e| Error(internal(format!("Failed to send command: {}", e))))?;
117
118 let receiver = self.receiver.clone();
121 let request_id_clone = request_id.clone();
122 thread::spawn(move || {
123 match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
125 Ok(msg) => {
126 if msg.request_id == request_id_clone {
127 match msg.response {
128 Ok(HttpChannelResponse::Command {
129 result,
130 ..
131 }) => {
132 callback(Ok(result));
133 }
134 Err(e) => {
135 callback(Err(e));
136 }
137 _ => {
138 callback(Err(Error(internal(
139 "Unexpected response type for command"
140 .to_string(),
141 ))));
142 }
143 }
144 }
145 }
146 Err(mpsc::RecvTimeoutError::Timeout) => {
147 callback(Err(Error(internal("Command request timeout".to_string()))));
148 }
149 Err(mpsc::RecvTimeoutError::Disconnected) => {
150 callback(Err(Error(internal("Command channel disconnected".to_string()))));
151 }
152 }
153 });
154
155 Ok(request_id)
156 }
157
158 pub fn query<F>(&self, rql: &str, params: Option<Params>, callback: F) -> Result<String, Error>
160 where
161 F: FnOnce(Result<QueryResult, Error>) + Send + 'static,
162 {
163 let request_id = self
165 .channel_session
166 .query(rql, params)
167 .map_err(|e| Error(internal(format!("Failed to send query: {}", e))))?;
168
169 let receiver = self.receiver.clone();
172 let request_id_clone = request_id.clone();
173 thread::spawn(move || {
174 match receiver.lock().unwrap().recv_timeout(Duration::from_secs(30)) {
176 Ok(msg) => {
177 if msg.request_id == request_id_clone {
178 match msg.response {
179 Ok(HttpChannelResponse::Query {
180 result,
181 ..
182 }) => {
183 callback(Ok(result));
184 }
185 Err(e) => {
186 callback(Err(e));
187 }
188 _ => {
189 callback(Err(Error(internal(
190 "Unexpected response type for query"
191 .to_string(),
192 ))));
193 }
194 }
195 }
196 }
197 Err(mpsc::RecvTimeoutError::Timeout) => {
198 callback(Err(Error(internal("Query request timeout".to_string()))));
199 }
200 Err(mpsc::RecvTimeoutError::Disconnected) => {
201 callback(Err(Error(internal("Query channel disconnected".to_string()))));
202 }
203 }
204 });
205
206 Ok(request_id)
207 }
208
209 pub fn is_authenticated(&self) -> bool {
211 *self.authenticated.lock().unwrap()
212 }
213}
214
215pub trait HttpResponseHandler: Send {
217 fn on_success(&mut self, result: CommandResult);
218 fn on_error(&mut self, error: String);
219 fn on_timeout(&mut self) {}
220}
221
222pub trait HttpQueryHandler: Send {
224 fn on_success(&mut self, result: QueryResult);
225 fn on_error(&mut self, error: String);
226 fn on_timeout(&mut self) {}
227}
228
229impl HttpCallbackSession {
230 pub fn command_with_handler(
232 &self,
233 rql: &str,
234 params: Option<Params>,
235 mut handler: impl HttpResponseHandler + 'static,
236 ) -> Result<String, Error> {
237 self.command(rql, params, move |result| match result {
238 Ok(data) => handler.on_success(data),
239 Err(e) => handler.on_error(e.to_string()),
240 })
241 }
242
243 pub fn query_with_handler(
245 &self,
246 rql: &str,
247 params: Option<Params>,
248 mut handler: impl HttpQueryHandler + 'static,
249 ) -> Result<String, Error> {
250 self.query(rql, params, move |result| match result {
251 Ok(data) => handler.on_success(data),
252 Err(e) => handler.on_error(e.to_string()),
253 })
254 }
255}
256
257impl Drop for HttpCallbackSession {
258 fn drop(&mut self) {
259 }
261}