reifydb_client/http/session/
blocking.rs1use std::{sync::mpsc, time::Duration};
5
6use reifydb_type::{Error, diagnostic::internal};
7
8use crate::{
9 Params,
10 http::{
11 client::HttpClient,
12 session::{HttpChannelResponse, HttpChannelSession, HttpResponseMessage},
13 },
14 session::{CommandResult, QueryResult},
15};
16
17pub struct HttpBlockingSession {
19 channel_session: HttpChannelSession,
20 receiver: mpsc::Receiver<HttpResponseMessage>,
21 authenticated: bool,
22 timeout: Duration,
23}
24
25impl HttpBlockingSession {
26 pub fn new(host: &str, port: u16, token: Option<String>) -> Result<Self, Error> {
28 let client = HttpClient::new((host, port))
29 .map_err(|e| Error(internal(format!("Failed to create client: {}", e))))?;
30 Self::from_client(client, token)
31 }
32
33 pub fn from_client(client: HttpClient, token: Option<String>) -> Result<Self, Error> {
35 let (channel_session, receiver) = HttpChannelSession::from_client(client, token.clone())?;
37
38 let mut session = Self {
39 channel_session,
40 receiver,
41 authenticated: false,
42 timeout: Duration::from_secs(30),
43 };
44
45 if token.is_some() {
47 session.wait_for_auth()?;
48 }
49
50 Ok(session)
51 }
52
53 pub fn from_url(url: &str, token: Option<String>) -> Result<Self, Error> {
55 let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
56 Self::from_client(client, token)
57 }
58
59 fn wait_for_auth(&mut self) -> Result<(), Error> {
61 match self.receiver.recv_timeout(self.timeout) {
65 Ok(msg) => match msg.response {
66 Ok(HttpChannelResponse::Auth {
67 ..
68 }) => {
69 self.authenticated = true;
70 Ok(())
71 }
72 Err(e) => Err(e),
73 _ => Err(Error(internal("Unexpected response type during authentication"))),
74 },
75 Err(_) => Err(Error(internal("Authentication timeout"))),
76 }
77 }
78
79 pub fn with_timeout(mut self, timeout: Duration) -> Self {
81 self.timeout = timeout;
82 self
83 }
84
85 pub fn command(&mut self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
87 let request_id = self
89 .channel_session
90 .command(rql, params)
91 .map_err(|e| Error(internal(format!("Failed to send command: {}", e))))?;
92
93 match self.receiver.recv_timeout(self.timeout) {
95 Ok(msg) => {
96 if msg.request_id != request_id {
97 return Err(Error(internal("Received response for wrong request ID")));
98 }
99 match msg.response {
100 Ok(HttpChannelResponse::Command {
101 result,
102 ..
103 }) => Ok(result),
104 Err(e) => Err(e),
105 _ => Err(Error(internal("Unexpected response type for command"))),
106 }
107 }
108 Err(_) => Err(Error(internal("Command timeout"))),
109 }
110 }
111
112 pub fn query(&mut self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
114 let request_id = self
116 .channel_session
117 .query(rql, params)
118 .map_err(|e| Error(internal(format!("Failed to send query: {}", e))))?;
119
120 match self.receiver.recv_timeout(self.timeout) {
122 Ok(msg) => {
123 if msg.request_id != request_id {
124 return Err(Error(internal("Received response for wrong request ID")));
125 }
126 match msg.response {
127 Ok(HttpChannelResponse::Query {
128 result,
129 ..
130 }) => Ok(result),
131 Err(e) => Err(e),
132 _ => Err(Error(internal("Unexpected response type for query"))),
133 }
134 }
135 Err(_) => Err(Error(internal("Query timeout"))),
136 }
137 }
138
139 pub fn is_authenticated(&self) -> bool {
141 self.authenticated
142 }
143}