1use reifydb_auth::service::{AuthResponse, AuthService};
11use reifydb_core::{
12 actors::server::{
13 ServerAuthResponse, ServerLogoutResponse, ServerMessage, ServerResponse, ServerSubscribeResponse,
14 },
15 execution::ExecutionResult,
16};
17use reifydb_engine::{engine::StandardEngine, session::RetryStrategy};
18use reifydb_runtime::{
19 actor::{
20 context::Context,
21 reply::Reply,
22 traits::{Actor, Directive},
23 },
24 context::clock::Clock,
25};
26use reifydb_type::{params::Params, value::identity::IdentityId};
27
28pub struct ServerActor {
29 engine: StandardEngine,
30 auth_service: AuthService,
31 clock: Clock,
32 retry: RetryStrategy,
33}
34
35impl ServerActor {
36 pub fn new(engine: StandardEngine, auth_service: AuthService, clock: Clock) -> Self {
37 Self {
38 engine,
39 auth_service,
40 clock,
41 retry: RetryStrategy::default(),
42 }
43 }
44
45 fn dispatch_execute(
46 &self,
47 identity: IdentityId,
48 rql: String,
49 params: Params,
50 reply: Reply<ServerResponse>,
51 execute: impl Fn(&StandardEngine, IdentityId, &str, Params) -> ExecutionResult,
52 ) {
53 let t = self.clock.instant();
54 let result = self
55 .retry
56 .execute(self.engine.rng(), &rql, || execute(&self.engine, identity, &rql, params.clone()));
57 if let Some(err) = result.error {
58 reply.send(ServerResponse::EngineError {
59 diagnostic: Box::new(err.diagnostic()),
60 rql,
61 });
62 } else {
63 reply.send(ServerResponse::Success {
64 frames: result.frames,
65 duration: t.elapsed(),
66 metrics: result.metrics,
67 });
68 }
69 }
70}
71
72impl Actor for ServerActor {
73 type State = ();
74 type Message = ServerMessage;
75
76 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {}
77
78 fn handle(&self, _state: &mut (), msg: ServerMessage, _ctx: &Context<ServerMessage>) -> Directive {
79 match msg {
80 ServerMessage::Query {
81 identity,
82 rql,
83 params,
84 reply,
85 } => {
86 self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| e.query_as(id, s, p));
87 }
88 ServerMessage::Command {
89 identity,
90 rql,
91 params,
92 reply,
93 } => {
94 self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| {
95 e.command_as(id, s, p)
96 });
97 }
98 ServerMessage::Admin {
99 identity,
100 rql,
101 params,
102 reply,
103 } => {
104 self.dispatch_execute(identity, rql, params, reply, |e, id, s, p| e.admin_as(id, s, p));
105 }
106 ServerMessage::Subscribe {
107 identity,
108 rql,
109 reply,
110 } => {
111 let t = self.clock.instant();
112 let result = self.engine.subscribe_as(identity, &rql, Params::None);
113 if let Some(err) = result.error {
114 reply.send(ServerSubscribeResponse::EngineError {
115 diagnostic: Box::new(err.diagnostic()),
116 rql,
117 });
118 } else {
119 reply.send(ServerSubscribeResponse::Subscribed {
120 frames: result.frames,
121 duration: t.elapsed(),
122 metrics: result.metrics,
123 });
124 }
125 }
126 ServerMessage::Authenticate {
127 method,
128 credentials,
129 reply,
130 } => match self.auth_service.authenticate(&method, credentials) {
131 Ok(AuthResponse::Authenticated {
132 identity,
133 token,
134 }) => reply.send(ServerAuthResponse::Authenticated {
135 identity,
136 token,
137 }),
138 Ok(AuthResponse::Challenge {
139 challenge_id,
140 payload,
141 }) => reply.send(ServerAuthResponse::Challenge {
142 challenge_id,
143 payload,
144 }),
145 Ok(AuthResponse::Failed {
146 reason,
147 }) => reply.send(ServerAuthResponse::Failed {
148 reason,
149 }),
150 Err(e) => reply.send(ServerAuthResponse::Error(e.to_string())),
151 },
152 ServerMessage::Logout {
153 token,
154 reply,
155 } => {
156 if self.auth_service.revoke_token(&token) {
157 reply.send(ServerLogoutResponse::Ok);
158 } else {
159 reply.send(ServerLogoutResponse::InvalidToken);
160 }
161 }
162 }
163 Directive::Continue
164 }
165}