reifydb_client/http/session/
blocking.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{sync::mpsc, time::Duration};
5
6use reifydb_type::{Error, diagnostic::internal};
7
8use crate::{
9	Params,
10	http::{
11		client::HttpClient,
12		session::{HttpChannelResponse, HttpChannelSession, HttpResponseMessage},
13	},
14	session::{CommandResult, QueryResult},
15};
16
17/// A blocking HTTP session that waits for responses synchronously
18pub struct HttpBlockingSession {
19	channel_session: HttpChannelSession,
20	receiver: mpsc::Receiver<HttpResponseMessage>,
21	authenticated: bool,
22	timeout: Duration,
23}
24
25impl HttpBlockingSession {
26	/// Create a new blocking HTTP session
27	pub fn new(host: &str, port: u16, token: Option<String>) -> Result<Self, Error> {
28		let client = HttpClient::new((host, port))
29			.map_err(|e| Error(internal(format!("Failed to create client: {}", e))))?;
30		Self::from_client(client, token)
31	}
32
33	/// Create a new blocking HTTP session from an existing client
34	pub fn from_client(client: HttpClient, token: Option<String>) -> Result<Self, Error> {
35		// Create a channel session and get the receiver
36		let (channel_session, receiver) = HttpChannelSession::from_client(client, token.clone())?;
37
38		let mut session = Self {
39			channel_session,
40			receiver,
41			authenticated: false,
42			timeout: Duration::from_secs(30),
43		};
44
45		// If token provided, wait for authentication response
46		if token.is_some() {
47			session.wait_for_auth()?;
48		}
49
50		Ok(session)
51	}
52
53	/// Create from URL (e.g., "http://localhost:8080")
54	pub fn from_url(url: &str, token: Option<String>) -> Result<Self, Error> {
55		let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
56		Self::from_client(client, token)
57	}
58
59	/// Wait for authentication response
60	fn wait_for_auth(&mut self) -> Result<(), Error> {
61		// Authentication was already initiated by
62		// HttpChannelSession::from_client We just need to wait for
63		// the response
64		match self.receiver.recv_timeout(self.timeout) {
65			Ok(msg) => match msg.response {
66				Ok(HttpChannelResponse::Auth {
67					..
68				}) => {
69					self.authenticated = true;
70					Ok(())
71				}
72				Err(e) => Err(e),
73				_ => Err(Error(internal("Unexpected response type during authentication"))),
74			},
75			Err(_) => Err(Error(internal("Authentication timeout"))),
76		}
77	}
78
79	/// Set timeout for requests
80	pub fn with_timeout(mut self, timeout: Duration) -> Self {
81		self.timeout = timeout;
82		self
83	}
84
85	/// Send a command and wait for response
86	pub fn command(&mut self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
87		// Send command through channel session
88		let request_id = self
89			.channel_session
90			.command(rql, params)
91			.map_err(|e| Error(internal(format!("Failed to send command: {}", e))))?;
92
93		// Wait for response
94		match self.receiver.recv_timeout(self.timeout) {
95			Ok(msg) => {
96				if msg.request_id != request_id {
97					return Err(Error(internal("Received response for wrong request ID")));
98				}
99				match msg.response {
100					Ok(HttpChannelResponse::Command {
101						result,
102						..
103					}) => Ok(result),
104					Err(e) => Err(e),
105					_ => Err(Error(internal("Unexpected response type for command"))),
106				}
107			}
108			Err(_) => Err(Error(internal("Command timeout"))),
109		}
110	}
111
112	/// Send a query and wait for response
113	pub fn query(&mut self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
114		// Send query through channel session
115		let request_id = self
116			.channel_session
117			.query(rql, params)
118			.map_err(|e| Error(internal(format!("Failed to send query: {}", e))))?;
119
120		// Wait for response
121		match self.receiver.recv_timeout(self.timeout) {
122			Ok(msg) => {
123				if msg.request_id != request_id {
124					return Err(Error(internal("Received response for wrong request ID")));
125				}
126				match msg.response {
127					Ok(HttpChannelResponse::Query {
128						result,
129						..
130					}) => Ok(result),
131					Err(e) => Err(e),
132					_ => Err(Error(internal("Unexpected response type for query"))),
133				}
134			}
135			Err(_) => Err(Error(internal("Query timeout"))),
136		}
137	}
138
139	/// Check if the session is authenticated
140	pub fn is_authenticated(&self) -> bool {
141		self.authenticated
142	}
143}