office_convert_client/
load.rs1use crate::{OfficeConvertClient, RequestError};
2use bytes::Bytes;
3use std::time::Duration;
4use tokio::{
5 sync::{Mutex, MutexGuard, Semaphore, SemaphorePermit},
6 time::{Instant, sleep_until},
7};
8use tracing::{debug, error};
9
10pub struct LoadBalancerConfig {
11 pub retry_busy_check_after: Duration,
13 pub retry_single_external: Duration,
15 pub notify_timeout: Duration,
17 pub retry_attempts: usize,
20}
21
22impl Default for LoadBalancerConfig {
23 fn default() -> Self {
24 Self {
25 retry_busy_check_after: Duration::from_secs(5),
26 retry_single_external: Duration::from_secs(1),
27 notify_timeout: Duration::from_secs(120),
28 retry_attempts: 3,
29 }
30 }
31}
32
33struct ClientSlot {
34 client: OfficeConvertClient,
36
37 next_busy_check: Option<Instant>,
40}
41
42pub struct OfficeConvertLoadBalancer {
46 clients: Vec<Mutex<ClientSlot>>,
48
49 client_permit: Semaphore,
52
53 config: LoadBalancerConfig,
55}
56
57enum TryAcquireResult<'a> {
58 Acquired {
59 client: MutexGuard<'a, ClientSlot>,
60 permit: SemaphorePermit<'a>,
61 },
62
63 BusyInternally,
65
66 BusyExternally {
68 next_wake_time: Instant,
71 },
72}
73
74impl OfficeConvertLoadBalancer {
75 pub fn new<I>(clients: I) -> Self
80 where
81 I: IntoIterator<Item = OfficeConvertClient>,
82 {
83 Self::new_with_timing(clients, Default::default())
84 }
85
86 pub fn new_with_timing<I>(clients: I, timing: LoadBalancerConfig) -> Self
93 where
94 I: IntoIterator<Item = OfficeConvertClient>,
95 {
96 let clients = clients
97 .into_iter()
98 .map(|client| {
99 Mutex::new(ClientSlot {
100 client,
101 next_busy_check: None,
102 })
103 })
104 .collect::<Vec<_>>();
105
106 let total_clients = clients.len();
107
108 Self {
109 clients,
110 client_permit: Semaphore::new(total_clients),
111 config: timing,
112 }
113 }
114
115 pub async fn convert(&self, file: Bytes) -> Result<bytes::Bytes, RequestError> {
116 let mut attempt = 0;
117
118 let error = loop {
119 let (client, _client_permit) = self.acquire_client().await;
120
121 match client.client.convert(file.clone()).await {
122 Ok(value) => return Ok(value),
123 Err(error) => {
124 if error.is_retry() {
125 tracing::error!(
126 ?error,
127 "connection error while attempting to convert, retrying"
128 );
129
130 attempt += 1;
131
132 if attempt <= self.config.retry_attempts {
133 continue;
134 }
135
136 break error;
137 }
138
139 return Err(error);
140 }
141 }
142 };
143
144 Err(error)
145 }
146
147 async fn acquire_client(&self) -> (MutexGuard<'_, ClientSlot>, SemaphorePermit<'_>) {
149 loop {
150 match self.try_acquire_client().await {
151 TryAcquireResult::Acquired { client, permit } => return (client, permit),
152
153 TryAcquireResult::BusyInternally => {
154 continue;
158 }
159
160 TryAcquireResult::BusyExternally { next_wake_time } => {
161 let now = Instant::now();
162
163 if now > next_wake_time {
165 continue;
166 }
167
168 sleep_until(next_wake_time).await;
170 }
171 }
172 }
173 }
174
175 async fn try_acquire_client(&self) -> TryAcquireResult<'_> {
178 let client_permit = self
180 .client_permit
181 .acquire()
182 .await
183 .expect("client permit was closed");
184
185 let mut next_wake_time = None;
186
187 for (index, slot) in self.clients.iter().enumerate() {
188 let mut client_lock = match slot.try_lock() {
189 Ok(client_lock) => client_lock,
190 Err(_) => continue,
192 };
193
194 let slot = &mut *client_lock;
195
196 if let Some(next_busy_check) = slot.next_busy_check {
199 if next_wake_time.is_none_or(|wake_time| next_busy_check < wake_time) {
202 next_wake_time = Some(next_busy_check);
203 }
204
205 let now = Instant::now();
206
207 if now < next_busy_check {
209 continue;
210 }
211
212 slot.next_busy_check = None;
214 }
215
216 match slot.client.is_busy().await {
218 Ok(false) => {
220 debug!("obtained available server {index} for convert");
221 return TryAcquireResult::Acquired {
222 client: client_lock,
223 permit: client_permit,
224 };
225 }
226
227 Ok(true) => {
229 debug!("server at {index} is busy externally");
230 }
231
232 Err(err) => {
234 error!("failed to perform server busy check at {index}, assuming busy: {err}");
235 }
236 }
237
238 let next_busy_check = Instant::now()
240 .checked_add(self.config.retry_busy_check_after)
241 .expect("time overflowed");
242 slot.next_busy_check = Some(next_busy_check);
243
244 if next_wake_time.is_none() {
245 next_wake_time = Some(next_busy_check)
246 }
247
248 }
250
251 if let Some(next_wake_time) = next_wake_time {
252 TryAcquireResult::BusyExternally { next_wake_time }
253 } else {
254 TryAcquireResult::BusyInternally
255 }
256 }
257}