Skip to main content

nestforge_grpc/
lib.rs

1use std::{future::Future, net::SocketAddr, sync::Arc};
2
3use anyhow::{Context, Result};
4use nestforge_core::{
5    framework_log_event, initialize_module_runtime, Container, InitializedModule, ModuleDefinition,
6};
7use nestforge_microservices::{
8    EventEnvelope, MessageEnvelope, MicroserviceContext, MicroserviceRegistry, TransportMetadata,
9};
10use serde::Serialize;
11use tonic::Status;
12
13pub use prost;
14pub use tonic;
15
16#[derive(Debug, Clone)]
17pub struct GrpcServerConfig {
18    pub addr: String,
19}
20
21impl Default for GrpcServerConfig {
22    fn default() -> Self {
23        Self {
24            addr: "127.0.0.1:50051".to_string(),
25        }
26    }
27}
28
29impl GrpcServerConfig {
30    pub fn new(addr: impl Into<String>) -> Self {
31        Self { addr: addr.into() }
32    }
33
34    pub fn socket_addr(&self) -> Result<SocketAddr> {
35        self.addr
36            .parse()
37            .with_context(|| format!("Invalid gRPC listen address `{}`", self.addr))
38    }
39}
40
41#[derive(Clone)]
42pub struct GrpcContext {
43    container: Container,
44}
45
46impl GrpcContext {
47    pub fn new(container: Container) -> Self {
48        Self { container }
49    }
50
51    pub fn container(&self) -> &Container {
52        &self.container
53    }
54
55    pub fn resolve<T>(&self) -> Result<Arc<T>, Status>
56    where
57        T: Send + Sync + 'static,
58    {
59        self.container.resolve::<T>().map_err(|err| {
60            Status::internal(format!(
61                "Failed to resolve dependency `{}`: {}",
62                std::any::type_name::<T>(),
63                err
64            ))
65        })
66    }
67
68    pub fn microservice_context(
69        &self,
70        pattern: impl Into<String>,
71        metadata: TransportMetadata,
72    ) -> MicroserviceContext {
73        MicroserviceContext::new(self.container.clone(), "grpc", pattern, metadata)
74    }
75}
76
77pub async fn dispatch_grpc_message<Payload>(
78    ctx: &GrpcContext,
79    registry: &MicroserviceRegistry,
80    pattern: impl Into<String>,
81    payload: Payload,
82    metadata: TransportMetadata,
83) -> Result<serde_json::Value, Status>
84where
85    Payload: Serialize,
86{
87    let pattern = pattern.into();
88    let envelope =
89        MessageEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
90    let envelope = envelope.with_metadata(metadata.clone());
91    let context = ctx.microservice_context(pattern, metadata);
92
93    registry
94        .dispatch_message(envelope, context)
95        .await
96        .map_err(map_microservice_error)
97}
98
99pub async fn dispatch_grpc_event<Payload>(
100    ctx: &GrpcContext,
101    registry: &MicroserviceRegistry,
102    pattern: impl Into<String>,
103    payload: Payload,
104    metadata: TransportMetadata,
105) -> Result<(), Status>
106where
107    Payload: Serialize,
108{
109    let pattern = pattern.into();
110    let envelope = EventEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
111    let envelope = envelope.with_metadata(metadata.clone());
112    let context = ctx.microservice_context(pattern, metadata);
113
114    registry
115        .dispatch_event(envelope, context)
116        .await
117        .map_err(map_microservice_error)
118}
119
120fn map_microservice_error(err: anyhow::Error) -> Status {
121    Status::internal(err.to_string())
122}
123
124pub struct NestForgeGrpcFactory<M: ModuleDefinition> {
125    _marker: std::marker::PhantomData<M>,
126    container: Container,
127    runtime: Arc<InitializedModule>,
128    config: GrpcServerConfig,
129}
130
131impl<M: ModuleDefinition> NestForgeGrpcFactory<M> {
132    pub fn create() -> Result<Self> {
133        let container = Container::new();
134        let runtime = Arc::new(initialize_module_runtime::<M>(&container)?);
135        runtime.run_module_init(&container)?;
136        runtime.run_application_bootstrap(&container)?;
137
138        Ok(Self {
139            _marker: std::marker::PhantomData,
140            container,
141            runtime,
142            config: GrpcServerConfig::default(),
143        })
144    }
145
146    pub fn with_addr(mut self, addr: impl Into<String>) -> Self {
147        self.config = GrpcServerConfig::new(addr);
148        self
149    }
150
151    pub fn with_config(mut self, config: GrpcServerConfig) -> Self {
152        self.config = config;
153        self
154    }
155
156    pub fn container(&self) -> &Container {
157        &self.container
158    }
159
160    pub fn context(&self) -> GrpcContext {
161        GrpcContext::new(self.container.clone())
162    }
163
164    pub fn socket_addr(&self) -> Result<SocketAddr> {
165        self.config.socket_addr()
166    }
167
168    pub async fn listen_with<F, Fut, E>(self, serve: F) -> Result<()>
169    where
170        F: FnOnce(GrpcContext, SocketAddr) -> Fut,
171        Fut: Future<Output = std::result::Result<(), E>>,
172        E: std::error::Error + Send + Sync + 'static,
173    {
174        let runtime = Arc::clone(&self.runtime);
175        let container = self.container.clone();
176        let addr = self.socket_addr()?;
177        framework_log_event("grpc_server_listening", &[("addr", addr.to_string())]);
178        serve(self.context(), addr)
179            .await
180            .map_err(anyhow::Error::new)
181            .context("gRPC transport server failed")?;
182        runtime.run_module_destroy(&container)?;
183        runtime.run_application_shutdown(&container)?;
184        Ok(())
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::sync::{
191        atomic::{AtomicUsize, Ordering},
192        Arc,
193    };
194
195    use nestforge_microservices::MicroserviceRegistry;
196
197    use super::{dispatch_grpc_event, dispatch_grpc_message, GrpcContext, TransportMetadata};
198
199    #[derive(Clone)]
200    struct Counter(Arc<AtomicUsize>);
201
202    #[tokio::test]
203    async fn grpc_dispatch_adapter_invokes_message_registry() {
204        let container = nestforge_core::Container::new();
205        container
206            .register(Counter(Arc::new(AtomicUsize::new(3))))
207            .expect("counter should register");
208        let ctx = GrpcContext::new(container);
209        let registry = MicroserviceRegistry::builder()
210            .message("counter.read", |_payload: (), ctx| async move {
211                let counter = ctx.resolve::<Counter>()?;
212                Ok(counter.0.load(Ordering::Relaxed))
213            })
214            .build();
215
216        let response = dispatch_grpc_message(
217            &ctx,
218            &registry,
219            "counter.read",
220            (),
221            TransportMetadata::new().insert("transport", "grpc"),
222        )
223        .await
224        .expect("message should dispatch");
225
226        assert_eq!(response, serde_json::json!(3));
227    }
228
229    #[tokio::test]
230    async fn grpc_dispatch_adapter_invokes_event_registry() {
231        let counter = Arc::new(AtomicUsize::new(0));
232        let ctx = GrpcContext::new(nestforge_core::Container::new());
233        let registry = MicroserviceRegistry::builder()
234            .event("counter.bump", {
235                let counter = Arc::clone(&counter);
236                move |_payload: (), _ctx| {
237                    let counter = Arc::clone(&counter);
238                    async move {
239                        counter.fetch_add(1, Ordering::Relaxed);
240                        Ok(())
241                    }
242                }
243            })
244            .build();
245
246        dispatch_grpc_event(
247            &ctx,
248            &registry,
249            "counter.bump",
250            (),
251            TransportMetadata::default(),
252        )
253        .await
254        .expect("event should dispatch");
255
256        assert_eq!(counter.load(Ordering::Relaxed), 1);
257    }
258}