edp_node/
gen_server.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::errors::Result;
16use crate::mailbox::Message;
17use crate::process::Process;
18use crate::registry::ProcessRegistry;
19use erltf::OwnedTerm;
20use erltf::types::{Atom, ExternalPid, ExternalReference};
21use std::future::Future;
22use std::sync::Arc;
23
24pub enum CallResult {
25    Reply(OwnedTerm),
26    NoReply,
27}
28
29pub trait GenServer: Send + 'static {
30    fn init(&mut self, args: Vec<OwnedTerm>) -> impl Future<Output = Result<()>> + Send + '_;
31
32    fn handle_call(
33        &mut self,
34        msg: OwnedTerm,
35        from: ExternalPid,
36    ) -> impl Future<Output = Result<CallResult>> + Send + '_;
37
38    fn handle_cast(&mut self, msg: OwnedTerm) -> impl Future<Output = Result<()>> + Send + '_;
39
40    fn handle_info(&mut self, msg: OwnedTerm) -> impl Future<Output = Result<()>> + Send + '_;
41
42    fn terminate(&mut self, _reason: OwnedTerm) -> impl Future<Output = ()> + Send + '_ {
43        async move {}
44    }
45}
46
47pub struct GenServerProcess<T: GenServer> {
48    server: T,
49    call_tag: Atom,
50    cast_tag: Atom,
51    registry: Arc<ProcessRegistry>,
52}
53
54impl<T: GenServer> GenServerProcess<T> {
55    pub fn new(server: T, registry: Arc<ProcessRegistry>) -> Self {
56        Self {
57            server,
58            call_tag: Atom::new("$gen_call"),
59            cast_tag: Atom::new("$gen_cast"),
60            registry,
61        }
62    }
63
64    async fn handle_gen_call(
65        &mut self,
66        from_pid: ExternalPid,
67        reference: ExternalReference,
68        request: OwnedTerm,
69    ) -> Result<()> {
70        let result = self.server.handle_call(request, from_pid.clone()).await?;
71
72        match result {
73            CallResult::Reply(reply) => {
74                let reply_msg = OwnedTerm::Tuple(vec![OwnedTerm::Reference(reference), reply]);
75
76                if let Some(handle) = self.registry.get(&from_pid).await {
77                    handle
78                        .send(Message::Regular {
79                            from: None,
80                            body: reply_msg,
81                        })
82                        .await?;
83                }
84
85                Ok(())
86            }
87            CallResult::NoReply => Ok(()),
88        }
89    }
90
91    async fn handle_gen_cast(&mut self, request: OwnedTerm) -> Result<()> {
92        self.server.handle_cast(request).await
93    }
94}
95
96impl<T: GenServer> Process for GenServerProcess<T> {
97    async fn handle_message(&mut self, msg: Message) -> Result<()> {
98        match msg {
99            Message::Regular { from: _, body } => {
100                if let OwnedTerm::Tuple(elements) = &body
101                    && elements.len() >= 2
102                    && let OwnedTerm::Atom(tag) = &elements[0]
103                {
104                    if tag == &self.call_tag && elements.len() == 3 {
105                        if let OwnedTerm::Tuple(from_tuple) = &elements[1]
106                            && from_tuple.len() == 2
107                            && let OwnedTerm::Pid(from_pid) = &from_tuple[0]
108                            && let OwnedTerm::Reference(reference) = &from_tuple[1]
109                        {
110                            let request = elements[2].clone();
111                            return self
112                                .handle_gen_call(from_pid.clone(), reference.clone(), request)
113                                .await;
114                        }
115                    } else if tag == &self.cast_tag && elements.len() == 2 {
116                        return self.handle_gen_cast(elements[1].clone()).await;
117                    }
118                }
119
120                self.server.handle_info(body).await
121            }
122            Message::Control { .. } => Ok(()),
123            Message::Exit { reason, .. } => {
124                self.server.terminate(reason).await;
125                Ok(())
126            }
127            _ => Ok(()),
128        }
129    }
130
131    async fn terminate(&mut self) {
132        self.server
133            .terminate(OwnedTerm::Atom(Atom::new("normal")))
134            .await;
135    }
136}