reifydb_client/ws/session/
blocking.rs1use std::{
5 sync::{Arc, mpsc},
6 time::Duration,
7};
8
9use reifydb_type::Error;
10
11use crate::{
12 Params,
13 session::{CommandResult, QueryResult},
14 ws::{
15 client::ClientInner,
16 session::{ChannelResponse, ChannelSession, ResponseMessage},
17 },
18};
19
20pub struct BlockingSession {
22 channel_session: ChannelSession,
23 receiver: mpsc::Receiver<ResponseMessage>,
24 authenticated: bool,
25 timeout: Duration,
26}
27
28impl BlockingSession {
29 pub(crate) fn new(client: Arc<ClientInner>, token: Option<String>) -> Result<Self, Error> {
31 let (channel_session, receiver) = ChannelSession::new(client, token.clone())?;
33
34 let mut session = Self {
35 channel_session,
36 receiver,
37 authenticated: false,
38 timeout: Duration::from_secs(30),
39 };
40
41 if token.is_some() {
43 session.wait_for_auth()?;
44 }
45
46 Ok(session)
47 }
48
49 fn wait_for_auth(&mut self) -> Result<(), Error> {
51 match self.receiver.recv_timeout(self.timeout) {
54 Ok(msg) => match msg.response {
55 Ok(ChannelResponse::Auth {
56 ..
57 }) => {
58 self.authenticated = true;
59 Ok(())
60 }
61 Err(e) => Err(e),
62 _ => panic!("Unexpected response type during authentication"),
63 },
64 Err(_) => panic!("Authentication timeout"),
65 }
66 }
67
68 pub fn command(&mut self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
70 let request_id = self.channel_session.command(rql, params).map_err(|e| {
72 reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
73 "Failed to send command: {}",
74 e
75 )))
76 })?;
77
78 match self.receiver.recv_timeout(self.timeout) {
80 Ok(msg) => {
81 if msg.request_id != request_id {
82 panic!("Received response for wrong request ID");
83 }
84 match msg.response {
85 Ok(ChannelResponse::Command {
86 result,
87 ..
88 }) => Ok(result),
89 Err(e) => Err(e),
90 _ => panic!("Unexpected response type for command"),
91 }
92 }
93 Err(_) => panic!("Command timeout"),
94 }
95 }
96
97 pub fn query(&mut self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
99 let request_id = self.channel_session.query(rql, params).map_err(|e| {
101 reifydb_type::Error(reifydb_type::diagnostic::internal(format!("Failed to send query: {}", e)))
102 })?;
103
104 match self.receiver.recv_timeout(self.timeout) {
106 Ok(msg) => {
107 if msg.request_id != request_id {
108 panic!("Received response for wrong request ID");
109 }
110 match msg.response {
111 Ok(ChannelResponse::Query {
112 result,
113 ..
114 }) => Ok(result),
115 Err(e) => Err(e),
116 _ => panic!("Unexpected response type for query"),
117 }
118 }
119 Err(_) => panic!("Query timeout"),
120 }
121 }
122
123 pub fn is_authenticated(&self) -> bool {
125 self.authenticated
126 }
127}