Skip to main content

nestforge_grpc/
lib.rs

1use std::{future::Future, net::SocketAddr, sync::Arc};
2
3use anyhow::{Context, Result};
4use nestforge_core::{
5    framework_log_event, initialize_module_runtime, Container, InitializedModule, ModuleDefinition,
6};
7use nestforge_microservices::{
8    EventEnvelope, MessageEnvelope, MicroserviceContext, MicroserviceRegistry, TransportMetadata,
9};
10use serde::Serialize;
11use tonic::Status;
12
13pub use prost;
14pub use tonic;
15
16#[derive(Debug, Clone)]
17pub struct GrpcServerConfig {
18    pub addr: String,
19}
20
21impl Default for GrpcServerConfig {
22    fn default() -> Self {
23        Self {
24            addr: "127.0.0.1:50051".to_string(),
25        }
26    }
27}
28
29impl GrpcServerConfig {
30    pub fn new(addr: impl Into<String>) -> Self {
31        Self { addr: addr.into() }
32    }
33
34    pub fn socket_addr(&self) -> Result<SocketAddr> {
35        self.addr
36            .parse()
37            .with_context(|| format!("Invalid gRPC listen address `{}`", self.addr))
38    }
39}
40
41#[derive(Clone)]
42pub struct GrpcContext {
43    container: Container,
44}
45
46impl GrpcContext {
47    pub fn new(container: Container) -> Self {
48        Self { container }
49    }
50
51    pub fn container(&self) -> &Container {
52        &self.container
53    }
54
55    pub fn resolve<T>(&self) -> Result<Arc<T>, Status>
56    where
57        T: Send + Sync + 'static,
58    {
59        self.container.resolve::<T>().map_err(|err| {
60            Status::internal(format!(
61                "Failed to resolve dependency `{}`: {}",
62                std::any::type_name::<T>(),
63                err
64            ))
65        })
66    }
67
68    pub fn microservice_context(
69        &self,
70        pattern: impl Into<String>,
71        metadata: TransportMetadata,
72    ) -> MicroserviceContext {
73        MicroserviceContext::new(self.container.clone(), "grpc", pattern, metadata)
74    }
75}
76
77pub async fn dispatch_grpc_message<Payload>(
78    ctx: &GrpcContext,
79    registry: &MicroserviceRegistry,
80    pattern: impl Into<String>,
81    payload: Payload,
82    metadata: TransportMetadata,
83) -> Result<serde_json::Value, Status>
84where
85    Payload: Serialize,
86{
87    let pattern = pattern.into();
88    let envelope = MessageEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
89    let envelope = envelope.with_metadata(metadata.clone());
90    let context = ctx.microservice_context(pattern, metadata);
91
92    registry
93        .dispatch_message(envelope, context)
94        .await
95        .map_err(map_microservice_error)
96}
97
98pub async fn dispatch_grpc_event<Payload>(
99    ctx: &GrpcContext,
100    registry: &MicroserviceRegistry,
101    pattern: impl Into<String>,
102    payload: Payload,
103    metadata: TransportMetadata,
104) -> Result<(), Status>
105where
106    Payload: Serialize,
107{
108    let pattern = pattern.into();
109    let envelope = EventEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
110    let envelope = envelope.with_metadata(metadata.clone());
111    let context = ctx.microservice_context(pattern, metadata);
112
113    registry
114        .dispatch_event(envelope, context)
115        .await
116        .map_err(map_microservice_error)
117}
118
119fn map_microservice_error(err: anyhow::Error) -> Status {
120    Status::internal(err.to_string())
121}
122
123pub struct NestForgeGrpcFactory<M: ModuleDefinition> {
124    _marker: std::marker::PhantomData<M>,
125    container: Container,
126    runtime: Arc<InitializedModule>,
127    config: GrpcServerConfig,
128}
129
130impl<M: ModuleDefinition> NestForgeGrpcFactory<M> {
131    pub fn create() -> Result<Self> {
132        let container = Container::new();
133        let runtime = Arc::new(initialize_module_runtime::<M>(&container)?);
134        runtime.run_module_init(&container)?;
135        runtime.run_application_bootstrap(&container)?;
136
137        Ok(Self {
138            _marker: std::marker::PhantomData,
139            container,
140            runtime,
141            config: GrpcServerConfig::default(),
142        })
143    }
144
145    pub fn with_addr(mut self, addr: impl Into<String>) -> Self {
146        self.config = GrpcServerConfig::new(addr);
147        self
148    }
149
150    pub fn with_config(mut self, config: GrpcServerConfig) -> Self {
151        self.config = config;
152        self
153    }
154
155    pub fn container(&self) -> &Container {
156        &self.container
157    }
158
159    pub fn context(&self) -> GrpcContext {
160        GrpcContext::new(self.container.clone())
161    }
162
163    pub fn socket_addr(&self) -> Result<SocketAddr> {
164        self.config.socket_addr()
165    }
166
167    pub async fn listen_with<F, Fut, E>(self, serve: F) -> Result<()>
168    where
169        F: FnOnce(GrpcContext, SocketAddr) -> Fut,
170        Fut: Future<Output = std::result::Result<(), E>>,
171        E: std::error::Error + Send + Sync + 'static,
172    {
173        let runtime = Arc::clone(&self.runtime);
174        let container = self.container.clone();
175        let addr = self.socket_addr()?;
176        framework_log_event("grpc_server_listening", &[("addr", addr.to_string())]);
177        serve(self.context(), addr)
178            .await
179            .map_err(anyhow::Error::new)
180            .context("gRPC transport server failed")?;
181        runtime.run_module_destroy(&container)?;
182        runtime.run_application_shutdown(&container)?;
183        Ok(())
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::sync::{
190        atomic::{AtomicUsize, Ordering},
191        Arc,
192    };
193
194    use nestforge_microservices::MicroserviceRegistry;
195
196    use super::{dispatch_grpc_event, dispatch_grpc_message, GrpcContext, TransportMetadata};
197
198    #[derive(Clone)]
199    struct Counter(Arc<AtomicUsize>);
200
201    #[tokio::test]
202    async fn grpc_dispatch_adapter_invokes_message_registry() {
203        let container = nestforge_core::Container::new();
204        container
205            .register(Counter(Arc::new(AtomicUsize::new(3))))
206            .expect("counter should register");
207        let ctx = GrpcContext::new(container);
208        let registry = MicroserviceRegistry::builder()
209            .message("counter.read", |_payload: (), ctx| async move {
210                let counter = ctx.resolve::<Counter>()?;
211                Ok(counter.0.load(Ordering::Relaxed))
212            })
213            .build();
214
215        let response = dispatch_grpc_message(
216            &ctx,
217            &registry,
218            "counter.read",
219            (),
220            TransportMetadata::new().insert("transport", "grpc"),
221        )
222        .await
223        .expect("message should dispatch");
224
225        assert_eq!(response, serde_json::json!(3));
226    }
227
228    #[tokio::test]
229    async fn grpc_dispatch_adapter_invokes_event_registry() {
230        let counter = Arc::new(AtomicUsize::new(0));
231        let ctx = GrpcContext::new(nestforge_core::Container::new());
232        let registry = MicroserviceRegistry::builder()
233            .event("counter.bump", {
234                let counter = Arc::clone(&counter);
235                move |_payload: (), _ctx| {
236                    let counter = Arc::clone(&counter);
237                    async move {
238                        counter.fetch_add(1, Ordering::Relaxed);
239                        Ok(())
240                    }
241                }
242            })
243            .build();
244
245        dispatch_grpc_event(
246            &ctx,
247            &registry,
248            "counter.bump",
249            (),
250            TransportMetadata::default(),
251        )
252        .await
253        .expect("event should dispatch");
254
255        assert_eq!(counter.load(Ordering::Relaxed), 1);
256    }
257}