Skip to main content

reifydb_sub_server/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Unified server actor for HTTP, gRPC, and WebSocket transports.
5//!
6//! The same `handle()` code runs in both native (rayon pool) and DST modes.
7//! Protocol-specific concerns (serialization, HTTP status codes, etc.) live in
8//! the transport layer — this actor only does engine dispatch and auth.
9
10use reifydb_auth::service::{AuthResponse, AuthService};
11use reifydb_core::{
12	actors::server::{
13		ServerAuthResponse, ServerLogoutResponse, ServerMessage, ServerResponse, ServerSubscribeResponse,
14	},
15	execution::ExecutionResult,
16};
17use reifydb_engine::{engine::StandardEngine, session::RetryStrategy};
18use reifydb_runtime::{
19	actor::{
20		context::Context,
21		reply::Reply,
22		traits::{Actor, Directive},
23	},
24	context::clock::Clock,
25};
26use reifydb_type::{params::Params, value::identity::IdentityId};
27
28pub struct ServerActor {
29	engine: StandardEngine,
30	auth_service: AuthService,
31	clock: Clock,
32	retry: RetryStrategy,
33}
34
35impl ServerActor {
36	pub fn new(engine: StandardEngine, auth_service: AuthService, clock: Clock) -> Self {
37		Self {
38			engine,
39			auth_service,
40			clock,
41			retry: RetryStrategy::default(),
42		}
43	}
44
45	fn dispatch_execute(
46		&self,
47		identity: IdentityId,
48		rql: String,
49		params: Params,
50		reply: Reply<ServerResponse>,
51		execute: impl Fn(&StandardEngine, IdentityId, &str, Params) -> ExecutionResult,
52	) {
53		let t = self.clock.instant();
54		let result = self
55			.retry
56			.execute(self.engine.rng(), &rql, || execute(&self.engine, identity, &rql, params.clone()));
57		if let Some(err) = result.error {
58			reply.send(ServerResponse::EngineError {
59				diagnostic: Box::new(err.diagnostic()),
60				rql,
61			});
62		} else {
63			reply.send(ServerResponse::Success {
64				frames: result.frames,
65				duration: t.elapsed(),
66				metrics: result.metrics,
67			});
68		}
69	}
70}
71
72impl Actor for ServerActor {
73	type State = ();
74	type Message = ServerMessage;
75
76	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {}
77
78	fn handle(&self, _state: &mut (), msg: ServerMessage, _ctx: &Context<ServerMessage>) -> Directive {
79		match msg {
80			ServerMessage::Query {
81				identity,
82				rql,
83				params,
84				reply,
85			} => {
86				self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| e.query_as(id, s, p));
87			}
88			ServerMessage::Command {
89				identity,
90				rql,
91				params,
92				reply,
93			} => {
94				self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| {
95					e.command_as(id, s, p)
96				});
97			}
98			ServerMessage::Admin {
99				identity,
100				rql,
101				params,
102				reply,
103			} => {
104				self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| e.admin_as(id, s, p));
105			}
106			ServerMessage::Subscribe {
107				identity,
108				rql,
109				reply,
110			} => {
111				let t = self.clock.instant();
112				let result = self.engine.subscribe_as(identity, &rql, Params::None);
113				if let Some(err) = result.error {
114					reply.send(ServerSubscribeResponse::EngineError {
115						diagnostic: Box::new(err.diagnostic()),
116						rql,
117					});
118				} else {
119					reply.send(ServerSubscribeResponse::Subscribed {
120						frames: result.frames,
121						duration: t.elapsed(),
122						metrics: result.metrics,
123					});
124				}
125			}
126			ServerMessage::Authenticate {
127				method,
128				credentials,
129				reply,
130			} => match self.auth_service.authenticate(&method, credentials) {
131				Ok(AuthResponse::Authenticated {
132					identity,
133					token,
134				}) => reply.send(ServerAuthResponse::Authenticated {
135					identity,
136					token,
137				}),
138				Ok(AuthResponse::Challenge {
139					challenge_id,
140					payload,
141				}) => reply.send(ServerAuthResponse::Challenge {
142					challenge_id,
143					payload,
144				}),
145				Ok(AuthResponse::Failed {
146					reason,
147				}) => reply.send(ServerAuthResponse::Failed {
148					reason,
149				}),
150				Err(e) => reply.send(ServerAuthResponse::Error(e.to_string())),
151			},
152			ServerMessage::Logout {
153				token,
154				reply,
155			} => {
156				if self.auth_service.revoke_token(&token) {
157					reply.send(ServerLogoutResponse::Ok);
158				} else {
159					reply.send(ServerLogoutResponse::InvalidToken);
160				}
161			}
162		}
163		Directive::Continue
164	}
165}