1#![deny(missing_docs)]
2
3pub use deadpool_redis::redis;
40use deadpool_redis::PoolError;
41use deadpool_redis::Runtime;
42use redis::FromRedisValue;
43use redis::RedisError;
44use redis::ToRedisArgs;
45use std::sync::Arc;
46mod config;
47pub mod lock;
49pub mod work_queue;
51use futures::stream::FuturesUnordered;
52use futures::StreamExt;
53use redis::AsyncCommands;
54use redis::RedisResult;
55use serde::de::DeserializeOwned;
56use serde::Serialize;
57use std::future::Future;
58use std::pin::Pin;
59use std::sync::atomic::AtomicBool;
60use std::sync::atomic::Ordering;
61use std::time::Duration;
62use tokio::runtime::Builder;
63use tokio::sync::RwLock;
64use tokio::task::LocalSet;
65use work_queue::Item;
66use work_queue::KeyPrefix;
67use work_queue::WorkQueue;
68
69pub use crate::config::Config as RedisConfig;
70use crate::lock::Lock;
71
72#[derive(Debug, thiserror::Error)]
74pub enum CacheError {
75 #[error(transparent)]
77 Pool(#[from] PoolError),
78 #[error(transparent)]
80 Redis(#[from] RedisError),
81 #[error("failed to fetch: {0}")]
83 Failure(String),
84}
85
86#[derive(serde::Serialize, serde::Deserialize)]
90pub struct Json<T>(T);
91
92impl<T> FromRedisValue for Json<T>
93where
94 T: DeserializeOwned,
95{
96 fn from_redis_value(v: redis::Value) -> Result<Self, redis::ParsingError> {
97 if let redis::Value::SimpleString(s) = v {
98 serde_json::from_str(&s).map_err(|e| redis::ParsingError::from(e.to_string()))
99 } else {
100 Err(redis::ParsingError::from("expected simple string value"))
101 }
102 }
103}
104
105impl<T> ToRedisArgs for Json<T>
106where
107 T: Serialize,
108{
109 fn write_redis_args<W>(&self, out: &mut W)
110 where
111 W: ?Sized + redis::RedisWrite,
112 {
113 let v = serde_json::to_string(&self.0).unwrap_or_default();
114 v.write_redis_args(out);
115 }
116}
117
118pub struct Inner {
120 config: RedisConfig,
121 client: redis::Client,
122 pool: deadpool_redis::Pool,
123}
124
125#[derive(Clone)]
127pub struct Redis {
128 inner: Arc<Inner>,
129}
130
131impl AsRef<deadpool_redis::Pool> for Redis {
132 fn as_ref(&self) -> &deadpool_redis::Pool {
133 &self.inner.pool
134 }
135}
136
137impl Redis {
138 pub fn new() -> anyhow::Result<Self> {
140 let config = RedisConfig::builder().build()?;
141 let client = redis::Client::open(config.address())?;
142 let redis_cfg = deadpool_redis::Config::from_url(config.address());
143 let pool = redis_cfg.create_pool(Some(Runtime::Tokio1))?;
144 Ok(Self {
145 inner: Arc::new(Inner {
146 config,
147 client,
148 pool,
149 }),
150 })
151 }
152
153 pub fn config(&self) -> &RedisConfig {
155 &self.inner.config
156 }
157
158 pub fn client(&self) -> &redis::Client {
160 &self.inner.client
161 }
162
163 pub fn pool(&self) -> Arc<deadpool_redis::Pool> {
165 Arc::new(self.inner.pool.clone())
166 }
167
168 pub async fn connect(&self) -> Result<deadpool_redis::Connection, deadpool_redis::PoolError> {
170 self.inner.pool.get().await
171 }
172
173 pub async fn cleanup(&self) -> anyhow::Result<()> {
175 let mut con = self.connect().await?;
176 let _: redis::Value = redis::cmd("FLUSHALL").query_async(&mut con).await?;
177 Ok(())
178 }
179
180 pub async fn lock(
182 &self,
183 key: &str,
184 ttl: usize,
185 retry_count: u32,
186 retry_delay: u32,
187 ) -> Result<Lock, lock::Error> {
188 let mut con = self.connect().await?;
189 lock::lock(&mut con, key, ttl, retry_count, retry_delay).await
190 }
191
192 pub async fn unlock(&self, key: &str, lock_id: &str) -> Result<i64, lock::Error> {
194 let mut con = self.connect().await?;
195 lock::unlock(&mut con, key, lock_id).await
196 }
197}
198
199pub async fn mutex_run<S, O, E, F>(lock_name: S, redis: &Redis, f: F) -> Result<O, E>
208where
209 S: AsRef<str>,
210 F: std::future::Future<Output = Result<O, E>>,
211 E: From<self::lock::Error>,
212{
213 let lock = redis.lock(lock_name.as_ref(), 5000, 20, 250).await?;
214
215 let result = f.await;
216
217 redis.unlock(lock_name.as_ref(), &lock.id).await?;
218
219 result
220}
221
222#[macro_export]
224macro_rules! redis {
225 ($storage:ty) => {
226 impl AsRef<qm::redis::Redis> for $storage {
227 fn as_ref(&self) -> &qm::redis::Redis {
228 &self.inner.redis
229 }
230 }
231 };
232}
233
234pub type RunningWorkers =
236 FuturesUnordered<Pin<Box<dyn Future<Output = String> + Send + Sync + 'static>>>;
237
238pub type ExecItemFuture = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>;
240
241pub struct WorkerContext<Ctx>
243where
244 Ctx: Clone + Send + Sync + 'static,
245{
246 ctx: Ctx,
247 pub worker_id: usize,
249 pub queue: Arc<WorkQueue>,
251 pub client: Arc<redis::Client>,
253 pub item: Item,
255}
256
257impl<Ctx> WorkerContext<Ctx>
258where
259 Ctx: Clone + Send + Sync + 'static,
260{
261 pub fn ctx(&self) -> &Ctx {
263 &self.ctx
264 }
265
266 pub async fn complete(&self) -> anyhow::Result<()> {
268 let mut con = self.client.get_multiplexed_async_connection().await?;
269 self.queue.complete(&mut con, &self.item).await?;
270 Ok(())
271 }
272}
273
274async fn add(
275 is_running: Arc<AtomicBool>,
276 instances: Arc<RwLock<Option<RunningWorkers>>>,
277 fut: Pin<Box<dyn Future<Output = String> + Send + Sync + 'static>>,
278) {
279 if !is_running.load(Ordering::SeqCst) {
280 return;
281 }
282 instances.write().await.as_mut().unwrap().push(fut);
283}
284
285#[async_trait::async_trait]
287pub trait Work<Ctx, T>: Send + Sync
288where
289 Ctx: Clone + Send + Sync + 'static,
290 T: DeserializeOwned + Send + Sync,
291{
292 async fn run(&self, ctx: WorkerContext<Ctx>, item: T) -> anyhow::Result<()>;
294}
295
296async fn run_recovery_worker<Ctx, T>(
297 client: Arc<redis::Client>,
298 is_running: Arc<AtomicBool>,
299 worker: Arc<AsyncWorker<Ctx, T>>,
300) -> anyhow::Result<()>
301where
302 Ctx: Clone + Send + Sync + 'static,
303 T: DeserializeOwned + Send + Sync,
304{
305 tracing::info!("start {} worker recovery", worker.prefix);
306 let mut con = client.get_multiplexed_async_connection().await?;
307 loop {
308 if !is_running.load(Ordering::SeqCst) {
309 break;
310 }
311 tokio::time::sleep(Duration::from_secs(10)).await;
312 worker.recover(&mut con).await?;
313 }
314 Ok(())
315}
316
317async fn run_worker_queue<Ctx, T>(
318 ctx: Ctx,
319 client: Arc<redis::Client>,
320 is_running: Arc<AtomicBool>,
321 worker: Arc<AsyncWorker<Ctx, T>>,
322 worker_id: usize,
323) -> anyhow::Result<()>
324where
325 Ctx: Clone + Send + Sync + 'static,
326 T: DeserializeOwned + Send + Sync,
327{
328 tracing::info!("start {} worker #{worker_id} queue", worker.prefix);
329 let request_queue = Arc::new(WorkQueue::new(KeyPrefix::new(worker.prefix.clone())));
330 let mut con = client.get_multiplexed_async_connection().await?;
331 loop {
332 if !is_running.load(Ordering::SeqCst) {
333 break;
334 }
335 if let Some(item) = request_queue
336 .lease(
337 &mut con,
338 Some(Duration::from_secs(worker.timeout)),
339 Duration::from_secs(worker.lease_duration),
340 )
341 .await?
342 {
343 if item.data.is_empty() {
344 tracing::info!("item is empty");
345 request_queue.complete(&mut con, &item).await?;
346 continue;
347 }
348 if let Ok(request) = serde_json::from_slice::<T>(&item.data).inspect_err(|_| {
349 tracing::error!(
350 "invalid request item on worker {} #{worker_id} Item: {}",
351 worker.prefix,
352 String::from_utf8_lossy(&item.data)
353 );
354 }) {
355 if let Some(work) = worker.work.as_ref() {
356 work.run(
357 WorkerContext {
358 ctx: ctx.clone(),
359 worker_id,
360 queue: request_queue.clone(),
361 client: client.clone(),
362 item: Item {
363 id: item.id.clone(),
364 data: Box::new([]),
365 },
366 },
367 request,
368 )
369 .await?;
370 }
371 } else {
372 request_queue.complete(&mut con, &item).await?;
373 }
374 }
375 }
376 Ok(())
377}
378
379struct WorkerInner {
380 client: Arc<redis::Client>,
381 instances: Arc<RwLock<Option<RunningWorkers>>>,
382 is_running: Arc<AtomicBool>,
383}
384
385#[derive(Clone)]
389pub struct Workers {
390 inner: Arc<WorkerInner>,
391}
392
393impl Workers {
394 pub fn new(config: &RedisConfig) -> RedisResult<Self> {
396 let client = Arc::new(redis::Client::open(config.address())?);
397 Ok(Self::new_with_client(client))
398 }
399
400 pub fn new_with_client(client: Arc<redis::Client>) -> Self {
402 Self {
403 inner: Arc::new(WorkerInner {
404 client,
405 instances: Arc::new(RwLock::new(Some(RunningWorkers::default()))),
406 is_running: Arc::new(AtomicBool::new(true)),
407 }),
408 }
409 }
410
411 pub async fn start<Ctx, T>(&self, ctx: Ctx, worker: AsyncWorker<Ctx, T>) -> anyhow::Result<()>
413 where
414 Ctx: Clone + Send + Sync + 'static,
415 T: DeserializeOwned + Send + Sync + 'static,
416 {
417 let worker = Arc::new(worker);
418 let mut con = self.inner.client.get_multiplexed_async_connection().await?;
419 worker.recover(&mut con).await?;
420 {
421 let instances = self.inner.instances.clone();
422 let client = self.inner.client.clone();
423 let worker = worker.clone();
424 let _th = std::thread::spawn(move || {
425 let rt = Builder::new_current_thread().enable_all().build().unwrap();
426 let local = LocalSet::new();
427 local.spawn_local(async move {
428 let fut_worker = worker.clone();
429 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
430 let is_running = Arc::new(AtomicBool::new(true));
431 let is_fut_running = is_running.clone();
432 add(
433 is_running.clone(),
434 instances,
435 Box::pin(async move {
436 let worker = fut_worker.clone();
437 tracing::info!("stopping {} recovery", worker.prefix);
438 is_fut_running.store(false, Ordering::SeqCst);
439 rx.await.ok();
440 " recovery".to_string()
441 }),
442 )
443 .await;
444 if let Err(err) = run_recovery_worker(client, is_running, worker).await {
445 tracing::error!("{err:#?}");
446 std::process::exit(1);
447 }
448 tx.send(()).ok();
449 });
450 rt.block_on(local);
451 });
452 }
453 for worker_id in 0..worker.num_workers {
454 let worker = worker.clone();
455 let client = self.inner.client.clone();
456 let ctx = ctx.clone();
457 let instances = self.inner.instances.clone();
458 let _th = std::thread::spawn(move || {
459 let rt = Builder::new_current_thread().enable_all().build().unwrap();
460 let local = LocalSet::new();
461 local.spawn_local(async move {
462 let fut_worker = worker.clone();
463 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
464 let is_running = Arc::new(AtomicBool::new(true));
465 let is_fut_running = is_running.clone();
466 add(
467 is_running.clone(),
468 instances,
469 Box::pin(async move {
470 let worker = fut_worker.clone();
471 tracing::info!("stopping {} #{worker_id}", worker.prefix);
472 is_fut_running.store(false, Ordering::SeqCst);
473 rx.await.ok();
474 format!("{} worker #{worker_id}", fut_worker.prefix)
475 }),
476 )
477 .await;
478 if let Err(err) =
479 run_worker_queue(ctx.clone(), client, is_running, worker, worker_id).await
480 {
481 tracing::error!("{err:#?}");
482 std::process::exit(1);
483 }
484 tx.send(()).ok();
485 });
486 rt.block_on(local);
487 });
488 }
489 Ok(())
490 }
491
492 pub async fn terminate(&self) -> anyhow::Result<()> {
494 if !self.inner.is_running.load(Ordering::SeqCst) {
495 anyhow::bail!("Workers already terminated");
496 }
497 let mut futs = self.inner.instances.write().await.take().unwrap();
498 tracing::info!("try stopping {} workers", futs.len());
499 while let Some(result) = futs.next().await {
500 tracing::info!("stopped {}", result);
501 }
502 Ok(())
503 }
504}
505
506pub struct Producer {
510 client: Arc<deadpool_redis::Pool>,
511 queue: WorkQueue,
512}
513
514impl Producer {
515 pub fn new<S>(config: &RedisConfig, prefix: S) -> anyhow::Result<Self>
517 where
518 S: Into<String>,
519 {
520 let redis_cfg = deadpool_redis::Config::from_url(config.address());
521 let redis = Arc::new(redis_cfg.create_pool(Some(Runtime::Tokio1))?);
522 Ok(Self::new_with_client(redis, prefix))
523 }
524
525 pub fn new_with_client<S>(client: Arc<deadpool_redis::Pool>, prefix: S) -> Self
527 where
528 S: Into<String>,
529 {
530 let queue = WorkQueue::new(KeyPrefix::new(prefix.into()));
531 Self { client, queue }
532 }
533
534 pub async fn add_item_with_connection<C, T>(&self, db: &mut C, data: &T) -> anyhow::Result<()>
536 where
537 C: AsyncCommands,
538 T: Serialize,
539 {
540 let item = Item::from_json_data(data)?;
541 self.queue.add_item(db, &item).await?;
542 Ok(())
543 }
544
545 pub async fn add_item<T>(&self, data: &T) -> anyhow::Result<()>
547 where
548 T: Serialize,
549 {
550 let item = Item::from_json_data(data)?;
551 let mut con = self.client.get().await?;
552 self.queue.add_item(&mut con, &item).await?;
553 Ok(())
554 }
555}
556
557pub struct AsyncWorker<Ctx, T>
562where
563 Ctx: Clone + Send + Sync + 'static,
564 T: DeserializeOwned + Send + Sync,
565{
566 prefix: String,
567 num_workers: usize,
568 timeout: u64,
569 lease_duration: u64,
570 recovery_key: String,
571 recovery_queue: WorkQueue,
572 work: Option<Box<dyn Work<Ctx, T>>>,
573}
574
575impl<Ctx, T> AsyncWorker<Ctx, T>
576where
577 Ctx: Clone + Send + Sync + 'static,
578 T: DeserializeOwned + Send + Sync,
579{
580 pub fn new<S>(prefix: S) -> Self
582 where
583 S: Into<String>,
584 {
585 let prefix = prefix.into();
586 let name = KeyPrefix::new(prefix.clone());
587 Self {
588 recovery_key: name.of(":clean"),
589 recovery_queue: WorkQueue::new(name),
590 timeout: 5,
591 lease_duration: 60,
592 num_workers: 1,
593 prefix,
594 work: None,
595 }
596 }
597
598 pub fn with_timeout(mut self, timeout: u64) -> Self {
600 self.timeout = timeout;
601 self
602 }
603
604 pub fn with_lease_duration(mut self, lease_duration: u64) -> Self {
606 self.lease_duration = lease_duration;
607 self
608 }
609
610 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
612 self.num_workers = num_workers;
613 self
614 }
615
616 pub fn producer(&self, client: Arc<deadpool_redis::Pool>) -> Producer {
618 Producer {
619 client,
620 queue: WorkQueue::new(KeyPrefix::new(self.prefix.clone())),
621 }
622 }
623
624 pub async fn recover<C: AsyncCommands>(&self, db: &mut C) -> anyhow::Result<()> {
626 let l = lock::lock(db, &self.recovery_key, 3600, 36, 100).await?;
627 self.recovery_queue.recover(db).await?;
628 lock::unlock(db, &self.recovery_key, l.id).await?;
629 Ok(())
630 }
631
632 pub fn run(mut self, work: impl Work<Ctx, T> + 'static) -> Self {
634 self.work = Some(Box::new(work));
635 self
636 }
637}