apalis_core/task/
metadata.rs1use crate::task::Task;
14use crate::task_fn::FromRequest;
15
16#[derive(Debug, Clone)]
18pub struct Meta<T>(T);
19pub trait MetadataExt<T> {
22 type Error;
24 fn extract(&self) -> Result<T, Self::Error>;
26 fn inject(&mut self, value: T) -> Result<(), Self::Error>;
28}
29
30impl<T, Args: Send + Sync, Ctx: MetadataExt<T> + Send + Sync, IdType: Send + Sync>
31 FromRequest<Task<Args, Ctx, IdType>> for Meta<T>
32{
33 type Error = Ctx::Error;
34
35 async fn from_request(task: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
36 task.parts.ctx.extract().map(Meta)
37 }
38}
39
40#[cfg(test)]
41#[allow(unused)]
42mod tests {
43 use std::{convert::Infallible, fmt::Debug, task::Poll, time::Duration};
44
45 use crate::{
46 error::BoxDynError,
47 task::{
48 Task,
49 metadata::{Meta, MetadataExt},
50 },
51 task_fn::FromRequest,
52 };
53 use futures_core::future::BoxFuture;
54 use tower::Service;
55
56 #[derive(Debug, Clone)]
57 struct ExampleService<S> {
58 service: S,
59 }
60 #[derive(Debug, Clone, Default)]
61 struct ExampleConfig {
62 timeout: Duration,
63 }
64
65 struct SampleStore;
66
67 impl MetadataExt<ExampleConfig> for SampleStore {
68 type Error = Infallible;
69 fn extract(&self) -> Result<ExampleConfig, Self::Error> {
70 Ok(ExampleConfig {
71 timeout: Duration::from_secs(1),
72 })
73 }
74 fn inject(&mut self, _: ExampleConfig) -> Result<(), Self::Error> {
75 unreachable!()
76 }
77 }
78
79 #[cfg(feature = "json")]
80 impl<T: serde::de::DeserializeOwned> MetadataExt<T> for SampleStore {
81 type Error = Infallible;
82 fn extract(&self) -> Result<T, Self::Error> {
83 unimplemented!()
84 }
85 fn inject(&mut self, _: T) -> Result<(), Self::Error> {
86 unimplemented!()
87 }
88 }
89
90 impl<S, Args: Send + Sync + 'static, Ctx: Send + Sync + 'static, IdType: Send + Sync + 'static>
91 Service<Task<Args, Ctx, IdType>> for ExampleService<S>
92 where
93 S: Service<Task<Args, Ctx, IdType>> + Clone + Send + 'static,
94 Ctx: MetadataExt<ExampleConfig> + Send,
95 Ctx::Error: Debug,
96 S::Future: Send + 'static,
97 {
98 type Response = S::Response;
99 type Error = S::Error;
100 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
101
102 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
103 self.service.poll_ready(cx)
104 }
105
106 fn call(&mut self, request: Task<Args, Ctx, IdType>) -> Self::Future {
107 let mut svc = self.service.clone();
108
109 Box::pin(async move {
111 let _config: Meta<ExampleConfig> = request.extract().await.unwrap();
112 svc.call(request).await
113 })
114 }
115 }
116}