apalis_core/task/
metadata.rs1use crate::task::Task;
14use crate::task_fn::FromRequest;
15
16#[derive(Debug, Clone)]
18pub struct Meta<T>(T);
19pub trait MetadataExt<T> {
22 type Error;
24 fn extract(&self) -> Result<T, Self::Error>;
26 fn inject(&mut self, value: T) -> Result<(), Self::Error>;
28}
29
30impl<T, Args: Send + Sync, Ctx: MetadataExt<T> + Send + Sync, IdType: Send + Sync>
31 FromRequest<Task<Args, Ctx, IdType>> for Meta<T>
32{
33 type Error = Ctx::Error;
34
35 async fn from_request(task: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
36 task.parts.ctx.extract().map(Meta)
37 }
38}
39
40#[cfg(test)]
41#[allow(unused)]
42mod tests {
43 use std::{convert::Infallible, fmt::Debug, task::Poll, time::Duration};
44
45 use crate::{
46 error::BoxDynError,
47 task::{
48 metadata::{Meta, MetadataExt},
49 Task,
50 },
51 task_fn::FromRequest,
52 };
53 use futures_core::future::BoxFuture;
54 use tower::Service;
55
56 #[derive(Debug, Clone)]
57 struct ExampleService<S> {
58 service: S,
59 }
60 #[derive(Debug, Clone, Default)]
61 struct ExampleConfig {
62 timeout: Duration,
63 }
64
65 struct SampleStore;
66
67 impl MetadataExt<ExampleConfig> for SampleStore {
68 type Error = Infallible;
69 fn extract(&self) -> Result<ExampleConfig, Self::Error> {
70 Ok(ExampleConfig {
71 timeout: Duration::from_secs(1),
72 })
73 }
74 fn inject(&mut self, _: ExampleConfig) -> Result<(), Self::Error> {
75 unreachable!()
76 }
77 }
78
79 #[cfg(feature = "json")]
80 impl<T: serde::de::DeserializeOwned> MetadataExt<T> for SampleStore {
81 type Error = Infallible;
82 fn extract(&self) -> Result<T, Self::Error> {
83 unimplemented!()
84 }
85 fn inject(&mut self, _: T) -> Result<(), Self::Error> {
86 unimplemented!()
87 }
88 }
89
90 impl<
91 S,
92 Args: Send + Sync + 'static,
93 Ctx: Send + Sync + 'static,
94 IdType: Send + Sync + 'static,
95 > Service<Task<Args, Ctx, IdType>> for ExampleService<S>
96 where
97 S: Service<Task<Args, Ctx, IdType>> + Clone + Send + 'static,
98 Ctx: MetadataExt<ExampleConfig> + Send,
99 Ctx::Error: Debug,
100 S::Future: Send + 'static,
101 {
102 type Response = S::Response;
103 type Error = S::Error;
104 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
105
106 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
107 self.service.poll_ready(cx)
108 }
109
110 fn call(&mut self, request: Task<Args, Ctx, IdType>) -> Self::Future {
111 let mut svc = self.service.clone();
112
113 Box::pin(async move {
115 let _config: Meta<ExampleConfig> = request.extract().await.unwrap();
116 svc.call(request).await
117 })
118 }
119 }
120}