redis_grpc/grpc/
server.rs

1use tonic::{transport::Server, Code, Request, Response, Status};
2
3use crate::conn::{MessageConsumer, RedisFacade};
4use crate::AppConfig;
5use redis_grpc::redis_grpc_server::{RedisGrpc, RedisGrpcServer};
6use redis_grpc::{
7    CommandRequest, CommandResponse, GetRequest, GetResponse, KeysRequest, KeysResponse,
8    PublishRequest, PublishResponse, SetRequest, SetResponse, SubscribeRequest, SubscribeResponse,
9};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_stream::wrappers::ReceiverStream;
13
14pub mod redis_grpc {
15    tonic::include_proto!("redis_grpc");
16}
17
18pub struct RedisGrpcImpl {
19    redis: RedisFacade,
20    redis_host: String,
21}
22
23impl RedisGrpcImpl {
24    pub async fn new(app_config: &AppConfig) -> Self {
25        let redis = RedisFacade::new(app_config.host.as_str()).await;
26        return RedisGrpcImpl {
27            redis,
28            redis_host: app_config.host.clone(),
29        };
30    }
31}
32
33#[tonic::async_trait]
34impl RedisGrpc for RedisGrpcImpl {
35    type SubscribeStream = ReceiverStream<Result<SubscribeResponse, Status>>;
36
37    async fn command(
38        &self,
39        request: Request<CommandRequest>,
40    ) -> Result<Response<CommandResponse>, Status> {
41        let request = request.into_inner();
42        let redis_result = self.redis.command(request.command.as_str()).await.unwrap();
43        let grpc_response = CommandResponse {
44            message: redis_result,
45        };
46        Ok(Response::new(grpc_response))
47    }
48
49    async fn subscribe(
50        &self,
51        request: Request<SubscribeRequest>,
52    ) -> Result<Response<Self::SubscribeStream>, Status> {
53        info!("Got a request: {:?}", request);
54        let (mut tx, rx) = tokio::sync::mpsc::channel(4);
55        let channels = request.into_inner().channels;
56        let url = self.redis_host.clone();
57        tokio::spawn(async move {
58            let consumer = SubscribeMessageConsumer(tx);
59            RedisFacade::subscribe_channels(&url, &channels, &consumer)
60                .await
61                .expect(format!("subscribe_channels exited: {:?}", channels).as_str());
62        });
63        Ok(Response::new(ReceiverStream::new(rx)))
64    }
65
66    async fn publish(
67        &self,
68        request: Request<PublishRequest>,
69    ) -> Result<Response<PublishResponse>, Status> {
70        let request = request.into_inner();
71        let redis_result = self
72            .redis
73            .publish(request.channel.as_str(), request.message.as_str())
74            .await;
75        let grpc_response = match redis_result {
76            Ok(result) => PublishResponse { result },
77            Err(err) => return Err(Status::new(Code::Internal, format!("{}", err))),
78        };
79        Ok(Response::new(grpc_response))
80    }
81
82    async fn keys(&self, request: Request<KeysRequest>) -> Result<Response<KeysResponse>, Status> {
83        let request = request.into_inner();
84        let redis_result = self.redis.keys(request.pattern.as_str()).await;
85        let grpc_response = match redis_result {
86            Ok(result) => KeysResponse {
87                success: true,
88                error: String::default(),
89                result,
90            },
91            Err(err) => KeysResponse {
92                success: false,
93                error: format!("{}", err),
94                result: vec![],
95            },
96        };
97        Ok(Response::new(grpc_response))
98    }
99
100    async fn set(&self, request: Request<SetRequest>) -> Result<Response<SetResponse>, Status> {
101        let request = request.into_inner();
102        let redis_result = self
103            .redis
104            .set(request.key.as_str(), request.value.as_str())
105            .await;
106        let grpc_response = match redis_result {
107            Ok(result) => SetResponse {
108                success: true,
109                error: String::default(),
110                result,
111            },
112            Err(err) => SetResponse {
113                success: false,
114                error: format!("{}", err),
115                result: String::default(),
116            },
117        };
118        Ok(Response::new(grpc_response))
119    }
120
121    async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
122        let request = request.into_inner();
123        let redis_result = self.redis.get(request.key.as_str()).await;
124        let grpc_response = match redis_result {
125            Ok(result) => GetResponse {
126                success: true,
127                error: String::default(),
128                result,
129            },
130            Err(err) => GetResponse {
131                success: false,
132                error: format!("{}", err),
133                result: String::default(),
134            },
135        };
136        Ok(Response::new(grpc_response))
137    }
138}
139
140pub struct SubscribeMessageConsumer(
141    tokio::sync::mpsc::Sender<Result<SubscribeResponse, tonic::Status>>,
142);
143#[tonic::async_trait]
144impl MessageConsumer for SubscribeMessageConsumer {
145    async fn consume(&self, message: redis::Msg) -> anyhow::Result<()> {
146        let response = SubscribeResponse {
147            channel: message.get_channel::<String>().unwrap(),
148            // pattern: message.get_pattern::<redis::Value>().unwrap(),
149            message: message.get_payload::<String>().unwrap(),
150        };
151        self.0.send(Ok(response)).await?;
152        Ok(())
153    }
154}
155
156pub struct RedisGrpcService {}
157impl RedisGrpcService {
158    pub fn new() -> Self {
159        RedisGrpcService {}
160    }
161    pub async fn subscribe(&self, app_config: &AppConfig) -> anyhow::Result<()> {
162        let socket_addr = format!("0.0.0.0:{port}", port = app_config.port).parse()?;
163        let redis_grpc = RedisGrpcImpl::new(app_config).await;
164        Server::builder()
165            .accept_http1(true)
166            .add_service(tonic_web::enable(RedisGrpcServer::new(redis_grpc)))
167            .serve(socket_addr)
168            .await?;
169
170        Ok(())
171    }
172}