vantage_api_pool/
aww_pool.rs1use reqwest::{Client, Request, Response};
2use rust_decimal::Decimal;
3use std::future::Future;
4use std::sync::Mutex;
5use tokio::{sync::mpsc, task::JoinError};
6
7use crate::{EventualRequestMatcher, HttpClientPool};
8
9type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
10
11struct Metadata {}
12
13#[allow(clippy::type_complexity)]
14pub struct AwwPool {
15 http_client_pool: HttpClientPool<Metadata>,
16 eventual_request_matcher: EventualRequestMatcher<Metadata>,
17 base_url: String,
18
19 auth_acquire_fn: Option<
21 std::sync::Arc<dyn Fn() -> BoxFuture<'static, Result<String, anyhow::Error>> + Send + Sync>,
22 >,
23 auth_apply_fn: Option<std::sync::Arc<dyn Fn(Request, &str) -> Request + Send + Sync>>,
24
25 auth_tokens: Mutex<Vec<String>>,
27}
28
29impl AwwPool {
30 pub fn new(
31 workers: usize,
32 rate_limit: Option<Decimal>,
33 use_dampener: bool,
34 base_url: String,
35 ) -> AwwPool {
36 let (request_sender, request_receiver) = mpsc::channel(100);
37
38 let (response_receiver, http_client_pool) =
39 HttpClientPool::new(workers, rate_limit, use_dampener, request_receiver);
40
41 Self {
42 http_client_pool,
43 eventual_request_matcher: EventualRequestMatcher::new(
44 request_sender,
45 response_receiver,
46 ),
47 base_url,
48 auth_acquire_fn: None,
49 auth_apply_fn: None,
50 auth_tokens: Mutex::new(Vec::new()),
51 }
52 }
53
54 pub async fn request(&self, mut request: Request) -> anyhow::Result<Response> {
56 if let (Some(acquire_fn), Some(apply_fn)) = (&self.auth_acquire_fn, &self.auth_apply_fn) {
58 let token = self.get_auth_token(acquire_fn).await?;
59 request = apply_fn(request, &token);
60 }
61
62 self.eventual_request_matcher
63 .send(request, None)
64 .await
65 .map_err(|e| anyhow::anyhow!(e))
66 }
67
68 pub async fn get(&self, path: &str) -> anyhow::Result<Response> {
69 let full_url = if path.starts_with('/') {
70 format!("{}{}", self.base_url, path)
71 } else {
72 format!("{}/{}", self.base_url, path)
73 };
74
75 let mut request = Client::builder().build()?.get(&full_url).build()?;
76
77 if let (Some(acquire_fn), Some(apply_fn)) = (&self.auth_acquire_fn, &self.auth_apply_fn) {
79 let token = self.get_auth_token(acquire_fn).await?;
80 request = apply_fn(request, &token);
81 }
82
83 self.eventual_request_matcher
84 .send(request, None)
85 .await
86 .map_err(|e| anyhow::anyhow!(e))
87 }
88
89 pub fn with_auth_callback<F, Fut, G>(
90 mut self,
91 n: usize,
92 token_acquirer: F,
93 request_modifier: G,
94 ) -> AwwPool
95 where
96 F: Fn() -> Fut + Send + Sync + 'static,
97 Fut: Future<Output = Result<String, anyhow::Error>> + Send + Sync + 'static,
98 G: Fn(Request, &str) -> Request + Send + Sync + 'static,
99 {
100 let async_fn =
102 move || Box::pin(token_acquirer()) as BoxFuture<'static, Result<String, anyhow::Error>>;
103
104 self.auth_acquire_fn = Some(std::sync::Arc::new(async_fn));
105 self.auth_apply_fn = Some(std::sync::Arc::new(request_modifier));
106 self.auth_tokens = Mutex::new(Vec::with_capacity(n));
107 self
108 }
109
110 async fn get_auth_token(
111 &self,
112 acquire_fn: &std::sync::Arc<
113 dyn Fn() -> BoxFuture<'static, Result<String, anyhow::Error>> + Send + Sync,
114 >,
115 ) -> anyhow::Result<String> {
116 {
118 let tokens = self.auth_tokens.lock().unwrap();
119 if !tokens.is_empty() {
120 return Ok(tokens[0].clone());
121 }
122 }
123
124 let token = acquire_fn().await?;
126
127 {
129 let mut tokens = self.auth_tokens.lock().unwrap();
130 if tokens.is_empty() {
131 tokens.push(token.clone());
132 }
133 }
134
135 Ok(token)
136 }
137
138 pub async fn shutdown(self) -> Result<(), JoinError> {
140 self.eventual_request_matcher.shutdown().await?;
141
142 self.http_client_pool.shutdown().await?;
143
144 Ok(())
145 }
146}