reifydb_client/ws/session/
blocking.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{
5	sync::{Arc, mpsc},
6	time::Duration,
7};
8
9use reifydb_type::Error;
10
11use crate::{
12	Params,
13	session::{CommandResult, QueryResult},
14	ws::{
15		client::ClientInner,
16		session::{ChannelResponse, ChannelSession, ResponseMessage},
17	},
18};
19
20/// A blocking session that waits for responses synchronously
21pub struct BlockingSession {
22	channel_session: ChannelSession,
23	receiver: mpsc::Receiver<ResponseMessage>,
24	authenticated: bool,
25	timeout: Duration,
26}
27
28impl BlockingSession {
29	/// Create a new blocking session
30	pub(crate) fn new(client: Arc<ClientInner>, token: Option<String>) -> Result<Self, Error> {
31		// Create a channel session and get the receiver
32		let (channel_session, receiver) = ChannelSession::new(client, token.clone())?;
33
34		let mut session = Self {
35			channel_session,
36			receiver,
37			authenticated: false,
38			timeout: Duration::from_secs(30),
39		};
40
41		// If token provided, wait for authentication response
42		if token.is_some() {
43			session.wait_for_auth()?;
44		}
45
46		Ok(session)
47	}
48
49	/// Wait for authentication response
50	fn wait_for_auth(&mut self) -> Result<(), Error> {
51		// Authentication was already initiated by ChannelSession::new
52		// We just need to wait for the response
53		match self.receiver.recv_timeout(self.timeout) {
54			Ok(msg) => match msg.response {
55				Ok(ChannelResponse::Auth {
56					..
57				}) => {
58					self.authenticated = true;
59					Ok(())
60				}
61				Err(e) => Err(e),
62				_ => panic!("Unexpected response type during authentication"),
63			},
64			Err(_) => panic!("Authentication timeout"),
65		}
66	}
67
68	/// Send a command and wait for response
69	pub fn command(&mut self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
70		// Send command through channel session
71		let request_id = self.channel_session.command(rql, params).map_err(|e| {
72			reifydb_type::Error(reifydb_type::diagnostic::internal(format!(
73				"Failed to send command: {}",
74				e
75			)))
76		})?;
77
78		// Wait for response
79		match self.receiver.recv_timeout(self.timeout) {
80			Ok(msg) => {
81				if msg.request_id != request_id {
82					panic!("Received response for wrong request ID");
83				}
84				match msg.response {
85					Ok(ChannelResponse::Command {
86						result,
87						..
88					}) => Ok(result),
89					Err(e) => Err(e),
90					_ => panic!("Unexpected response type for command"),
91				}
92			}
93			Err(_) => panic!("Command timeout"),
94		}
95	}
96
97	/// Send a query and wait for response
98	pub fn query(&mut self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
99		// Send query through channel session
100		let request_id = self.channel_session.query(rql, params).map_err(|e| {
101			reifydb_type::Error(reifydb_type::diagnostic::internal(format!("Failed to send query: {}", e)))
102		})?;
103
104		// Wait for response
105		match self.receiver.recv_timeout(self.timeout) {
106			Ok(msg) => {
107				if msg.request_id != request_id {
108					panic!("Received response for wrong request ID");
109				}
110				match msg.response {
111					Ok(ChannelResponse::Query {
112						result,
113						..
114					}) => Ok(result),
115					Err(e) => Err(e),
116					_ => panic!("Unexpected response type for query"),
117				}
118			}
119			Err(_) => panic!("Query timeout"),
120		}
121	}
122
123	/// Check if the session is authenticated
124	pub fn is_authenticated(&self) -> bool {
125		self.authenticated
126	}
127}