1use std::marker::PhantomPinned;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::{fmt::Debug, ptr::NonNull};
7
8use mountpoint_s3_crt_sys::{
9 aws_allocator, aws_byte_buf, aws_byte_buf_from_empty_array, aws_byte_cursor, aws_future_s3_buffer_ticket,
10 aws_future_s3_buffer_ticket_acquire, aws_future_s3_buffer_ticket_new, aws_future_s3_buffer_ticket_release,
11 aws_future_s3_buffer_ticket_set_result_by_move, aws_ref_count_init, aws_s3_buffer_pool, aws_s3_buffer_pool_config,
12 aws_s3_buffer_pool_factory_fn, aws_s3_buffer_pool_reserve_meta, aws_s3_buffer_pool_vtable, aws_s3_buffer_ticket,
13 aws_s3_buffer_ticket_vtable,
14};
15
16use crate::ToAwsByteCursor as _;
17use crate::common::allocator::Allocator;
18use crate::s3::client::MetaRequestType;
19
20pub trait MemoryPool: Clone + Send + Sync {
25 type Buffer: AsMut<[u8]>;
27
28 fn get_buffer(&self, size: usize, meta_request_type: MetaRequestType) -> Self::Buffer;
30
31 fn trim(&self) -> bool;
35}
36
37pub trait MemoryPoolFactory: Send + Sync {
39 type Pool: MemoryPool;
41
42 fn create(&self, options: MemoryPoolFactoryOptions) -> Self::Pool;
44}
45
46impl<F, P> MemoryPoolFactory for F
47where
48 F: Fn(MemoryPoolFactoryOptions) -> P + Send + Sync,
49 P: MemoryPool,
50{
51 type Pool = P;
52
53 fn create(&self, options: MemoryPoolFactoryOptions) -> Self::Pool {
54 self(options)
55 }
56}
57
58#[derive(Debug)]
60pub struct MemoryPoolFactoryOptions {
61 part_size: usize,
62 max_part_size: usize,
63 memory_limit: usize,
64}
65
66impl MemoryPoolFactoryOptions {
67 pub fn part_size(&self) -> usize {
69 self.part_size
70 }
71 pub fn max_part_size(&self) -> usize {
73 self.max_part_size
74 }
75 pub fn memory_limit(&self) -> usize {
77 self.memory_limit
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct CrtBufferPoolFactory(Arc<CrtBufferPoolFactoryInner>);
84
85#[derive(Debug)]
86struct CrtBufferPoolFactoryInner {
87 factory_ptr: NonNull<libc::c_void>,
88 factory_fn: aws_s3_buffer_pool_factory_fn,
89 drop_fn: fn(*mut ::libc::c_void),
90}
91
92unsafe impl Send for CrtBufferPoolFactoryInner {}
94unsafe impl Sync for CrtBufferPoolFactoryInner {}
96
97impl Drop for CrtBufferPoolFactoryInner {
98 fn drop(&mut self) {
99 (self.drop_fn)(self.factory_ptr.as_ptr());
100 }
101}
102
103impl CrtBufferPoolFactory {
104 pub fn new<PoolFactory: MemoryPoolFactory>(pool_factory: PoolFactory) -> Self {
106 let factory = Box::pin(pool_factory);
107 let leaked = Box::leak(unsafe { Pin::into_inner_unchecked(factory) });
110 let factory_ptr = unsafe { NonNull::new_unchecked(leaked as *mut PoolFactory as *mut libc::c_void) };
112 Self(Arc::new(CrtBufferPoolFactoryInner {
113 factory_ptr,
114 factory_fn: Some(buffer_pool_factory::<PoolFactory>),
115 drop_fn: drop_pool_factory::<PoolFactory>,
116 }))
117 }
118
119 pub(crate) fn as_inner(&self) -> (aws_s3_buffer_pool_factory_fn, *mut ::libc::c_void) {
121 (self.0.factory_fn, self.0.factory_ptr.as_ptr())
122 }
123}
124
125unsafe extern "C" fn buffer_pool_factory<PoolFactory: MemoryPoolFactory>(
126 allocator: *mut aws_allocator,
127 config: aws_s3_buffer_pool_config,
128 user_data: *mut libc::c_void,
129) -> *mut aws_s3_buffer_pool {
130 let pool_factory = unsafe { &*(user_data as *mut PoolFactory) };
132
133 let allocator = unsafe { NonNull::new_unchecked(allocator).into() };
135
136 let options = MemoryPoolFactoryOptions {
137 part_size: config.part_size,
138 max_part_size: config.max_part_size,
139 memory_limit: config.memory_limit,
140 };
141 let pool = pool_factory.create(options);
142
143 let crt_pool = CrtBufferPool::new(pool.clone(), allocator);
144
145 unsafe { crt_pool.leak() }
147}
148
149fn drop_pool_factory<PoolFactory: MemoryPoolFactory>(factory_ptr: *mut libc::c_void) {
150 _ = unsafe { Pin::new_unchecked(Box::from_raw(factory_ptr as *mut PoolFactory)) };
152}
153
154struct CrtBufferPool<Pool: MemoryPool> {
164 inner: aws_s3_buffer_pool,
166 pool: Pool,
168 pool_vtable: aws_s3_buffer_pool_vtable,
170 ticket_vtable: aws_s3_buffer_ticket_vtable,
172 allocator: Allocator,
174 _pinned: PhantomPinned,
176}
177
178impl<Pool: MemoryPool> CrtBufferPool<Pool> {
179 fn new(pool: Pool, allocator: Allocator) -> Pin<Box<Self>> {
180 let mut crt_pool = Box::pin(CrtBufferPool {
182 inner: Default::default(),
183 pool,
184 pool_vtable: aws_s3_buffer_pool_vtable {
185 reserve: Some(pool_reserve::<Pool>),
186 trim: Some(pool_trim::<Pool>),
187 acquire: None,
188 release: None,
189 },
190 ticket_vtable: aws_s3_buffer_ticket_vtable {
191 claim: Some(ticket_claim::<Pool::Buffer>),
192 acquire: None,
193 release: None,
194 },
195 allocator,
196 _pinned: Default::default(),
197 });
198
199 unsafe {
203 let pool_ref = Pin::get_unchecked_mut(Pin::as_mut(&mut crt_pool));
204 pool_ref.inner.vtable = &raw mut pool_ref.pool_vtable;
205 pool_ref.inner.impl_ = pool_ref as *mut CrtBufferPool<Pool> as *mut libc::c_void;
206 aws_ref_count_init(
207 &mut pool_ref.inner.ref_count,
208 &mut pool_ref.inner as *mut aws_s3_buffer_pool as *mut libc::c_void,
209 Some(pool_destroy::<Pool>),
210 );
211 }
212
213 crt_pool
214 }
215
216 unsafe fn leak(self: Pin<Box<Self>>) -> *mut aws_s3_buffer_pool {
222 let pool = Box::leak(unsafe { Pin::into_inner_unchecked(self) });
224 &raw mut pool.inner
225 }
226
227 unsafe fn ref_from_raw(pool: &*mut aws_s3_buffer_pool) -> &Self {
232 unsafe {
234 let impl_ptr = (**pool).impl_;
235 &*(impl_ptr as *mut Self)
236 }
237 }
238
239 unsafe fn from_raw(pool: *mut aws_s3_buffer_pool) -> Pin<Box<Self>> {
244 unsafe { Pin::new_unchecked(Box::from_raw((*pool).impl_ as *mut Self)) }
246 }
247
248 fn trim(&self) {
249 self.pool.trim();
250 }
251
252 fn reserve(&self, size: usize, meta_request_type: MetaRequestType) -> CrtTicketFuture {
253 let future = CrtTicketFuture::new(&self.allocator);
254
255 let buffer = self.pool.get_buffer(size, meta_request_type);
258 let ticket = self.make_ticket(buffer);
259 future.set(ticket);
260
261 future
262 }
263
264 fn make_ticket(&self, buffer: Pool::Buffer) -> Pin<Box<CrtTicket<Pool::Buffer>>> {
265 let mut ticket = Box::pin(CrtTicket {
267 inner: Default::default(),
268 ticket_vtable: self.ticket_vtable,
269 buffer,
270 _pinned: Default::default(),
271 });
272
273 unsafe {
277 let ticket_ref = Pin::get_unchecked_mut(Pin::as_mut(&mut ticket));
278 ticket_ref.inner.vtable = &raw mut ticket_ref.ticket_vtable;
279 ticket_ref.inner.impl_ = ticket_ref as *mut CrtTicket<Pool::Buffer> as *mut libc::c_void;
280 aws_ref_count_init(
281 &mut ticket_ref.inner.ref_count,
282 &mut ticket_ref.inner as *mut aws_s3_buffer_ticket as *mut libc::c_void,
283 Some(ticket_destroy::<Pool::Buffer>),
284 );
285 }
286
287 ticket
288 }
289}
290
291unsafe extern "C" fn pool_reserve<Pool: MemoryPool>(
292 pool: *mut aws_s3_buffer_pool,
293 meta: aws_s3_buffer_pool_reserve_meta,
294) -> *mut aws_future_s3_buffer_ticket {
295 let crt_pool = unsafe { CrtBufferPool::<Pool>::ref_from_raw(&pool) };
297
298 let request_type = unsafe { (*meta.meta_request).type_ };
300
301 let future = crt_pool.reserve(meta.size, request_type.into());
302
303 unsafe { future.into_inner_ptr() }
305}
306
307unsafe extern "C" fn pool_trim<Pool: MemoryPool>(pool: *mut aws_s3_buffer_pool) {
308 let crt_pool = unsafe { CrtBufferPool::<Pool>::ref_from_raw(&pool) };
310 crt_pool.trim();
311}
312
313unsafe extern "C" fn pool_destroy<Pool: MemoryPool>(data: *mut libc::c_void) {
314 let pool = data as *mut aws_s3_buffer_pool;
315
316 _ = unsafe { CrtBufferPool::<Pool>::from_raw(pool) };
318}
319
320struct CrtTicket<Buffer: AsMut<[u8]>> {
322 inner: aws_s3_buffer_ticket,
324 ticket_vtable: aws_s3_buffer_ticket_vtable,
326 buffer: Buffer,
328 _pinned: PhantomPinned,
330}
331
332impl<Buffer: AsMut<[u8]>> CrtTicket<Buffer> {
333 unsafe fn leak(self: Pin<Box<Self>>) -> *mut aws_s3_buffer_ticket {
339 let boxed = unsafe { Pin::into_inner_unchecked(self) };
341 let pool = Box::leak(boxed);
342 &raw mut pool.inner
343 }
344
345 unsafe fn ref_mut_from_raw(ticket: &mut *mut aws_s3_buffer_ticket) -> &mut Self {
350 unsafe {
352 let impl_ptr = (**ticket).impl_;
353 &mut *(impl_ptr as *mut Self)
354 }
355 }
356
357 unsafe fn from_raw(ticket: *mut aws_s3_buffer_ticket) -> Pin<Box<Self>> {
362 unsafe { Pin::new_unchecked(Box::from_raw((*ticket).impl_ as *mut Self)) }
364 }
365}
366
367unsafe extern "C" fn ticket_claim<Buffer: AsMut<[u8]>>(mut ticket: *mut aws_s3_buffer_ticket) -> aws_byte_buf {
369 let ticket = unsafe { CrtTicket::<Buffer>::ref_mut_from_raw(&mut ticket) };
371
372 let aws_byte_cursor { len, ptr } = unsafe { ticket.buffer.as_mut().as_aws_byte_cursor() };
374
375 unsafe { aws_byte_buf_from_empty_array(ptr as *mut libc::c_void, len) }
378}
379
380unsafe extern "C" fn ticket_destroy<Buffer: AsMut<[u8]>>(data: *mut libc::c_void) {
381 let ticket = data as *mut aws_s3_buffer_ticket;
382 _ = unsafe { CrtTicket::<Buffer>::from_raw(ticket) };
384}
385
386#[derive(Debug)]
388struct CrtTicketFuture {
389 inner: *mut aws_future_s3_buffer_ticket,
390}
391
392unsafe impl Send for CrtTicketFuture {}
394
395unsafe impl Sync for CrtTicketFuture {}
397
398impl CrtTicketFuture {
399 fn new(allocator: &Allocator) -> Self {
400 let inner = unsafe { aws_future_s3_buffer_ticket_new(allocator.inner.as_ptr()) };
402 Self { inner }
403 }
404
405 fn set<Buffer: AsMut<[u8]>>(&self, ticket: Pin<Box<CrtTicket<Buffer>>>) {
406 let mut ticket = unsafe { ticket.leak() };
408 unsafe {
410 aws_future_s3_buffer_ticket_set_result_by_move(self.inner, &mut ticket);
411 }
412 }
413
414 unsafe fn into_inner_ptr(mut self) -> *mut aws_future_s3_buffer_ticket {
419 std::mem::replace(&mut self.inner, std::ptr::null_mut())
421 }
422}
423
424impl Clone for CrtTicketFuture {
425 fn clone(&self) -> Self {
426 let inner = unsafe { aws_future_s3_buffer_ticket_acquire(self.inner) };
429 Self { inner }
430 }
431}
432
433impl Drop for CrtTicketFuture {
434 fn drop(&mut self) {
435 unsafe { aws_future_s3_buffer_ticket_release(self.inner) };
438 }
439}