Skip to main content

vantage_api_pool/
aww_pool.rs

1use reqwest::{Client, Request, Response};
2use rust_decimal::Decimal;
3use std::future::Future;
4use std::sync::Mutex;
5use tokio::{sync::mpsc, task::JoinError};
6
7use crate::{EventualRequestMatcher, HttpClientPool};
8
9type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
10
11struct Metadata {}
12
13#[allow(clippy::type_complexity)]
14pub struct AwwPool {
15    http_client_pool: HttpClientPool<Metadata>,
16    eventual_request_matcher: EventualRequestMatcher<Metadata>,
17    base_url: String,
18
19    // Auth callbacks
20    auth_acquire_fn: Option<
21        std::sync::Arc<dyn Fn() -> BoxFuture<'static, Result<String, anyhow::Error>> + Send + Sync>,
22    >,
23    auth_apply_fn: Option<std::sync::Arc<dyn Fn(Request, &str) -> Request + Send + Sync>>,
24
25    // Token pool
26    auth_tokens: Mutex<Vec<String>>,
27}
28
29impl AwwPool {
30    pub fn new(
31        workers: usize,
32        rate_limit: Option<Decimal>,
33        use_dampener: bool,
34        base_url: String,
35    ) -> AwwPool {
36        let (request_sender, request_receiver) = mpsc::channel(100);
37
38        let (response_receiver, http_client_pool) =
39            HttpClientPool::new(workers, rate_limit, use_dampener, request_receiver);
40
41        Self {
42            http_client_pool,
43            eventual_request_matcher: EventualRequestMatcher::new(
44                request_sender,
45                response_receiver,
46            ),
47            base_url,
48            auth_acquire_fn: None,
49            auth_apply_fn: None,
50            auth_tokens: Mutex::new(Vec::new()),
51        }
52    }
53
54    /// pool.request(client.post(url).build());
55    pub async fn request(&self, mut request: Request) -> anyhow::Result<Response> {
56        // Apply auth if configured
57        if let (Some(acquire_fn), Some(apply_fn)) = (&self.auth_acquire_fn, &self.auth_apply_fn) {
58            let token = self.get_auth_token(acquire_fn).await?;
59            request = apply_fn(request, &token);
60        }
61
62        self.eventual_request_matcher
63            .send(request, None)
64            .await
65            .map_err(|e| anyhow::anyhow!(e))
66    }
67
68    pub async fn get(&self, path: &str) -> anyhow::Result<Response> {
69        let full_url = if path.starts_with('/') {
70            format!("{}{}", self.base_url, path)
71        } else {
72            format!("{}/{}", self.base_url, path)
73        };
74
75        let mut request = Client::builder().build()?.get(&full_url).build()?;
76
77        // Apply auth if configured
78        if let (Some(acquire_fn), Some(apply_fn)) = (&self.auth_acquire_fn, &self.auth_apply_fn) {
79            let token = self.get_auth_token(acquire_fn).await?;
80            request = apply_fn(request, &token);
81        }
82
83        self.eventual_request_matcher
84            .send(request, None)
85            .await
86            .map_err(|e| anyhow::anyhow!(e))
87    }
88
89    pub fn with_auth_callback<F, Fut, G>(
90        mut self,
91        n: usize,
92        token_acquirer: F,
93        request_modifier: G,
94    ) -> AwwPool
95    where
96        F: Fn() -> Fut + Send + Sync + 'static,
97        Fut: Future<Output = Result<String, anyhow::Error>> + Send + Sync + 'static,
98        G: Fn(Request, &str) -> Request + Send + Sync + 'static,
99    {
100        // Pin the future inside the method
101        let async_fn =
102            move || Box::pin(token_acquirer()) as BoxFuture<'static, Result<String, anyhow::Error>>;
103
104        self.auth_acquire_fn = Some(std::sync::Arc::new(async_fn));
105        self.auth_apply_fn = Some(std::sync::Arc::new(request_modifier));
106        self.auth_tokens = Mutex::new(Vec::with_capacity(n));
107        self
108    }
109
110    async fn get_auth_token(
111        &self,
112        acquire_fn: &std::sync::Arc<
113            dyn Fn() -> BoxFuture<'static, Result<String, anyhow::Error>> + Send + Sync,
114        >,
115    ) -> anyhow::Result<String> {
116        // Check if we already have a cached token
117        {
118            let tokens = self.auth_tokens.lock().unwrap();
119            if !tokens.is_empty() {
120                return Ok(tokens[0].clone());
121            }
122        }
123
124        // No cached token, acquire a new one
125        let token = acquire_fn().await?;
126
127        // Cache the token
128        {
129            let mut tokens = self.auth_tokens.lock().unwrap();
130            if tokens.is_empty() {
131                tokens.push(token.clone());
132            }
133        }
134
135        Ok(token)
136    }
137
138    // Gracefully shuts down the AwwPool, ensuring all resources are cleaned up.
139    pub async fn shutdown(self) -> Result<(), JoinError> {
140        self.eventual_request_matcher.shutdown().await?;
141
142        self.http_client_pool.shutdown().await?;
143
144        Ok(())
145    }
146}