Skip to main content

alien_bindings/providers/worker/
grpc.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Worker, WorkerInvokeRequest, WorkerInvokeResponse};
3use alien_error::{Context as _, IntoAlienError as _};
4use async_trait::async_trait;
5use std::collections::BTreeMap;
6use std::fmt::{Debug, Formatter};
7use tonic::transport::Channel;
8
9// Import generated protobuf types
10pub mod proto {
11    tonic::include_proto!("alien_bindings.worker");
12}
13
14use proto::{worker_service_client::WorkerServiceClient, GetWorkerUrlRequest, InvokeRequest};
15
16/// gRPC-based Worker implementation that forwards calls to a remote Worker service
17pub struct GrpcWorker {
18    client: WorkerServiceClient<Channel>,
19    binding_name: String,
20}
21
22impl Debug for GrpcWorker {
23    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("GrpcWorker")
25            .field("binding_name", &self.binding_name)
26            .finish()
27    }
28}
29
30impl GrpcWorker {
31    /// Create a new gRPC Worker client
32    pub async fn new(binding_name: String, grpc_endpoint: String) -> Result<Self> {
33        let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_endpoint).await?;
34        Self::new_from_channel(channel, binding_name).await
35    }
36
37    /// Create a new gRPC Worker client from an existing channel
38    pub async fn new_from_channel(channel: Channel, binding_name: String) -> Result<Self> {
39        let client = WorkerServiceClient::new(channel);
40
41        Ok(Self {
42            client,
43            binding_name,
44        })
45    }
46}
47
48impl Binding for GrpcWorker {}
49
50#[async_trait]
51impl Worker for GrpcWorker {
52    async fn invoke(&self, request: WorkerInvokeRequest) -> Result<WorkerInvokeResponse> {
53        let mut client = self.client.clone();
54
55        let grpc_request = tonic::Request::new(InvokeRequest {
56            binding_name: self.binding_name.clone(),
57            target_worker: request.target_worker,
58            method: request.method,
59            path: request.path,
60            headers: request.headers.into_iter().collect(),
61            body: request.body,
62            timeout_seconds: request.timeout.map(|t| t.as_secs()),
63        });
64
65        let response = client
66            .invoke(grpc_request)
67            .await
68            .into_alien_error()
69            .context(ErrorData::GrpcRequestFailed {
70                service: "WorkerService".to_string(),
71                method: "invoke".to_string(),
72                details: "Failed to invoke worker".to_string(),
73            })?;
74
75        let response_inner = response.into_inner();
76        Ok(WorkerInvokeResponse {
77            status: response_inner.status as u16,
78            headers: response_inner
79                .headers
80                .into_iter()
81                .collect::<BTreeMap<_, _>>(),
82            body: response_inner.body,
83        })
84    }
85
86    async fn get_worker_url(&self) -> Result<Option<String>> {
87        let mut client = self.client.clone();
88
89        let grpc_request = tonic::Request::new(GetWorkerUrlRequest {
90            binding_name: self.binding_name.clone(),
91        });
92
93        let response = client
94            .get_worker_url(grpc_request)
95            .await
96            .into_alien_error()
97            .context(ErrorData::GrpcRequestFailed {
98                service: "WorkerService".to_string(),
99                method: "get_worker_url".to_string(),
100                details: "Failed to get worker URL".to_string(),
101            })?;
102
103        Ok(response.into_inner().url)
104    }
105
106    fn as_any(&self) -> &dyn std::any::Any {
107        self
108    }
109}