office_convert_client/
load.rs

1use crate::{OfficeConvertClient, RequestError};
2use bytes::Bytes;
3use std::time::Duration;
4use tokio::{
5    sync::{Mutex, MutexGuard, Semaphore, SemaphorePermit},
6    time::{Instant, sleep_until},
7};
8use tracing::{debug, error};
9
10pub struct LoadBalancerConfig {
11    /// Time in-between external busy checks
12    pub retry_busy_check_after: Duration,
13    /// Time to wait before repeated attempts
14    pub retry_single_external: Duration,
15    /// Timeout to wait on the notifier for
16    pub notify_timeout: Duration,
17    /// Number of attempts to retry a file for if the
18    /// request fails due to connection loss
19    pub retry_attempts: usize,
20}
21
22impl Default for LoadBalancerConfig {
23    fn default() -> Self {
24        Self {
25            retry_busy_check_after: Duration::from_secs(5),
26            retry_single_external: Duration::from_secs(1),
27            notify_timeout: Duration::from_secs(120),
28            retry_attempts: 3,
29        }
30    }
31}
32
33struct ClientSlot {
34    /// The actual client
35    client: OfficeConvertClient,
36
37    /// If the server was busy the last time it was checked this is the
38    /// timestamp when the next busy check is allowed to be performed
39    next_busy_check: Option<Instant>,
40}
41
42/// Round robbin load balancer, will pass convert jobs
43/// around to the next available client, connections
44/// will wait until there is an available client
45pub struct OfficeConvertLoadBalancer {
46    /// Available clients the load balancer can use
47    clients: Vec<Mutex<ClientSlot>>,
48
49    /// Permit for each client to track number of currently
50    /// used client and waiting for free clients
51    client_permit: Semaphore,
52
53    /// Timing for various actions
54    config: LoadBalancerConfig,
55}
56
57enum TryAcquireResult<'a> {
58    Acquired {
59        client: MutexGuard<'a, ClientSlot>,
60        permit: SemaphorePermit<'a>,
61    },
62
63    /// All clients are currently active
64    BusyInternally,
65
66    /// All available clients are currently blocked externally
67    BusyExternally {
68        /// Instant that clients should wake up at to check
69        /// again for a new available client
70        next_wake_time: Instant,
71    },
72}
73
74impl OfficeConvertLoadBalancer {
75    /// Creates a load balancer from the provided collection of clients
76    ///
77    /// ## Arguments
78    /// * `clients` - The clients to load balance amongst
79    pub fn new<I>(clients: I) -> Self
80    where
81        I: IntoIterator<Item = OfficeConvertClient>,
82    {
83        Self::new_with_timing(clients, Default::default())
84    }
85
86    /// Creates a load balancer from the provided collection of clients
87    /// with timing configuration
88    ///
89    /// ## Arguments
90    /// * `clients` - The clients to load balance amongst
91    /// * `timing` - Timing configuration
92    pub fn new_with_timing<I>(clients: I, timing: LoadBalancerConfig) -> Self
93    where
94        I: IntoIterator<Item = OfficeConvertClient>,
95    {
96        let clients = clients
97            .into_iter()
98            .map(|client| {
99                Mutex::new(ClientSlot {
100                    client,
101                    next_busy_check: None,
102                })
103            })
104            .collect::<Vec<_>>();
105
106        let total_clients = clients.len();
107
108        Self {
109            clients,
110            client_permit: Semaphore::new(total_clients),
111            config: timing,
112        }
113    }
114
115    pub async fn convert(&self, file: Bytes) -> Result<bytes::Bytes, RequestError> {
116        let mut attempt = 0;
117
118        let error = loop {
119            let (client, _client_permit) = self.acquire_client().await;
120
121            match client.client.convert(file.clone()).await {
122                Ok(value) => return Ok(value),
123                Err(error) => {
124                    if error.is_retry() {
125                        tracing::error!(
126                            ?error,
127                            "connection error while attempting to convert, retrying"
128                        );
129
130                        attempt += 1;
131
132                        if attempt <= self.config.retry_attempts {
133                            continue;
134                        }
135
136                        break error;
137                    }
138
139                    return Err(error);
140                }
141            }
142        };
143
144        Err(error)
145    }
146
147    /// Acquire a client, will wait until a new client is available
148    async fn acquire_client(&self) -> (MutexGuard<'_, ClientSlot>, SemaphorePermit<'_>) {
149        loop {
150            match self.try_acquire_client().await {
151                TryAcquireResult::Acquired { client, permit } => return (client, permit),
152
153                TryAcquireResult::BusyInternally => {
154                    // Retry immediately so we wait on acquiring the next permit. Realistically
155                    // this state would only ever occur if a permit was obtained before a client
156                    // lock was released
157                    continue;
158                }
159
160                TryAcquireResult::BusyExternally { next_wake_time } => {
161                    let now = Instant::now();
162
163                    // Check for time drift
164                    if now > next_wake_time {
165                        continue;
166                    }
167
168                    // Sleep until next check
169                    sleep_until(next_wake_time).await;
170                }
171            }
172        }
173    }
174
175    /// Attempt to acquire a client that is ready to be used
176    /// and attempt a conversion
177    async fn try_acquire_client(&self) -> TryAcquireResult<'_> {
178        // Acquire a permit to obtain a client
179        let client_permit = self
180            .client_permit
181            .acquire()
182            .await
183            .expect("client permit was closed");
184
185        let mut next_wake_time = None;
186
187        for (index, slot) in self.clients.iter().enumerate() {
188            let mut client_lock = match slot.try_lock() {
189                Ok(client_lock) => client_lock,
190                // Server is already in use, skip it
191                Err(_) => continue,
192            };
193
194            let slot = &mut *client_lock;
195
196            // If we have more than one client and this client was already checked for being busy earlier
197            // then this client will be skipped and won't be checked until a later point
198            if let Some(next_busy_check) = slot.next_busy_check {
199                // If the busy check for this task is sooner than the next wake time prefer
200                // to use the next busy check as the wake time
201                if next_wake_time.is_none_or(|wake_time| next_busy_check < wake_time) {
202                    next_wake_time = Some(next_busy_check);
203                }
204
205                let now = Instant::now();
206
207                // This client is not ready to be checked yet
208                if now < next_busy_check {
209                    continue;
210                }
211
212                // Clear next busy check timestamp (We are about to re-check it)
213                slot.next_busy_check = None;
214            }
215
216            // Check if the server is busy externally (Busy outside of our control)
217            match slot.client.is_busy().await {
218                // Server is not busy
219                Ok(false) => {
220                    debug!("obtained available server {index} for convert");
221                    return TryAcquireResult::Acquired {
222                        client: client_lock,
223                        permit: client_permit,
224                    };
225                }
226
227                // Client is busy (Externally)
228                Ok(true) => {
229                    debug!("server at {index} is busy externally");
230                }
231
232                // Erroneous clients are considered busy
233                Err(err) => {
234                    error!("failed to perform server busy check at {index}, assuming busy: {err}");
235                }
236            }
237
238            // Compute the next busy check timestamp for the client
239            let next_busy_check = Instant::now()
240                .checked_add(self.config.retry_busy_check_after)
241                .expect("time overflowed");
242            slot.next_busy_check = Some(next_busy_check);
243
244            if next_wake_time.is_none() {
245                next_wake_time = Some(next_busy_check)
246            }
247
248            // ..check the next available client
249        }
250
251        if let Some(next_wake_time) = next_wake_time {
252            TryAcquireResult::BusyExternally { next_wake_time }
253        } else {
254            TryAcquireResult::BusyInternally
255        }
256    }
257}