redis_grpc/grpc/
server.rs1use tonic::{transport::Server, Code, Request, Response, Status};
2
3use crate::conn::{MessageConsumer, RedisFacade};
4use crate::AppConfig;
5use redis_grpc::redis_grpc_server::{RedisGrpc, RedisGrpcServer};
6use redis_grpc::{
7 CommandRequest, CommandResponse, GetRequest, GetResponse, KeysRequest, KeysResponse,
8 PublishRequest, PublishResponse, SetRequest, SetResponse, SubscribeRequest, SubscribeResponse,
9};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_stream::wrappers::ReceiverStream;
13
14pub mod redis_grpc {
15 tonic::include_proto!("redis_grpc");
16}
17
18pub struct RedisGrpcImpl {
19 redis: RedisFacade,
20 redis_host: String,
21}
22
23impl RedisGrpcImpl {
24 pub async fn new(app_config: &AppConfig) -> Self {
25 let redis = RedisFacade::new(app_config.host.as_str()).await;
26 return RedisGrpcImpl {
27 redis,
28 redis_host: app_config.host.clone(),
29 };
30 }
31}
32
33#[tonic::async_trait]
34impl RedisGrpc for RedisGrpcImpl {
35 type SubscribeStream = ReceiverStream<Result<SubscribeResponse, Status>>;
36
37 async fn command(
38 &self,
39 request: Request<CommandRequest>,
40 ) -> Result<Response<CommandResponse>, Status> {
41 let request = request.into_inner();
42 let redis_result = self.redis.command(request.command.as_str()).await.unwrap();
43 let grpc_response = CommandResponse {
44 message: redis_result,
45 };
46 Ok(Response::new(grpc_response))
47 }
48
49 async fn subscribe(
50 &self,
51 request: Request<SubscribeRequest>,
52 ) -> Result<Response<Self::SubscribeStream>, Status> {
53 info!("Got a request: {:?}", request);
54 let (mut tx, rx) = tokio::sync::mpsc::channel(4);
55 let channels = request.into_inner().channels;
56 let url = self.redis_host.clone();
57 tokio::spawn(async move {
58 let consumer = SubscribeMessageConsumer(tx);
59 RedisFacade::subscribe_channels(&url, &channels, &consumer)
60 .await
61 .expect(format!("subscribe_channels exited: {:?}", channels).as_str());
62 });
63 Ok(Response::new(ReceiverStream::new(rx)))
64 }
65
66 async fn publish(
67 &self,
68 request: Request<PublishRequest>,
69 ) -> Result<Response<PublishResponse>, Status> {
70 let request = request.into_inner();
71 let redis_result = self
72 .redis
73 .publish(request.channel.as_str(), request.message.as_str())
74 .await;
75 let grpc_response = match redis_result {
76 Ok(result) => PublishResponse { result },
77 Err(err) => return Err(Status::new(Code::Internal, format!("{}", err))),
78 };
79 Ok(Response::new(grpc_response))
80 }
81
82 async fn keys(&self, request: Request<KeysRequest>) -> Result<Response<KeysResponse>, Status> {
83 let request = request.into_inner();
84 let redis_result = self.redis.keys(request.pattern.as_str()).await;
85 let grpc_response = match redis_result {
86 Ok(result) => KeysResponse {
87 success: true,
88 error: String::default(),
89 result,
90 },
91 Err(err) => KeysResponse {
92 success: false,
93 error: format!("{}", err),
94 result: vec![],
95 },
96 };
97 Ok(Response::new(grpc_response))
98 }
99
100 async fn set(&self, request: Request<SetRequest>) -> Result<Response<SetResponse>, Status> {
101 let request = request.into_inner();
102 let redis_result = self
103 .redis
104 .set(request.key.as_str(), request.value.as_str())
105 .await;
106 let grpc_response = match redis_result {
107 Ok(result) => SetResponse {
108 success: true,
109 error: String::default(),
110 result,
111 },
112 Err(err) => SetResponse {
113 success: false,
114 error: format!("{}", err),
115 result: String::default(),
116 },
117 };
118 Ok(Response::new(grpc_response))
119 }
120
121 async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
122 let request = request.into_inner();
123 let redis_result = self.redis.get(request.key.as_str()).await;
124 let grpc_response = match redis_result {
125 Ok(result) => GetResponse {
126 success: true,
127 error: String::default(),
128 result,
129 },
130 Err(err) => GetResponse {
131 success: false,
132 error: format!("{}", err),
133 result: String::default(),
134 },
135 };
136 Ok(Response::new(grpc_response))
137 }
138}
139
140pub struct SubscribeMessageConsumer(
141 tokio::sync::mpsc::Sender<Result<SubscribeResponse, tonic::Status>>,
142);
143#[tonic::async_trait]
144impl MessageConsumer for SubscribeMessageConsumer {
145 async fn consume(&self, message: redis::Msg) -> anyhow::Result<()> {
146 let response = SubscribeResponse {
147 channel: message.get_channel::<String>().unwrap(),
148 message: message.get_payload::<String>().unwrap(),
150 };
151 self.0.send(Ok(response)).await?;
152 Ok(())
153 }
154}
155
156pub struct RedisGrpcService {}
157impl RedisGrpcService {
158 pub fn new() -> Self {
159 RedisGrpcService {}
160 }
161 pub async fn subscribe(&self, app_config: &AppConfig) -> anyhow::Result<()> {
162 let socket_addr = format!("0.0.0.0:{port}", port = app_config.port).parse()?;
163 let redis_grpc = RedisGrpcImpl::new(app_config).await;
164 Server::builder()
165 .accept_http1(true)
166 .add_service(tonic_web::enable(RedisGrpcServer::new(redis_grpc)))
167 .serve(socket_addr)
168 .await?;
169
170 Ok(())
171 }
172}