fuel_core/service/
vm_pool.rs

1use core::{
2    fmt,
3    mem,
4};
5use fuel_core_types::fuel_vm::interpreter::MemoryInstance;
6use std::sync::{
7    Arc,
8    Mutex,
9};
10use tokio::sync::OwnedSemaphorePermit;
11
12/// Memory instance originating from a pool.
13/// Will be recycled back into the pool when dropped.
14pub struct MemoryFromPool {
15    pool: MemoryPool,
16    memory: MemoryInstance,
17    _permit: OwnedSemaphorePermit,
18}
19
20impl Drop for MemoryFromPool {
21    fn drop(&mut self) {
22        self.pool.recycle_raw(mem::take(&mut self.memory));
23    }
24}
25
26impl fmt::Debug for MemoryFromPool {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        f.debug_struct("MemoryFromPool")
29            .field("pool", &"..")
30            .field("memory", &self.memory)
31            .finish()
32    }
33}
34
35impl AsRef<MemoryInstance> for MemoryFromPool {
36    fn as_ref(&self) -> &MemoryInstance {
37        self.memory.as_ref()
38    }
39}
40
41impl AsMut<MemoryInstance> for MemoryFromPool {
42    fn as_mut(&mut self) -> &mut MemoryInstance {
43        self.memory.as_mut()
44    }
45}
46
47/// Pool of VM memory instances for reuse.
48#[derive(Clone)]
49pub struct MemoryPool {
50    semaphore: Arc<tokio::sync::Semaphore>,
51    pool: Arc<Mutex<Vec<MemoryInstance>>>,
52}
53impl MemoryPool {
54    pub fn new(number_of_instances: usize) -> Self {
55        Self {
56            semaphore: Arc::new(tokio::sync::Semaphore::new(number_of_instances)),
57            pool: Arc::new(Mutex::new(Vec::new())),
58        }
59    }
60
61    /// Gets a new raw VM memory instance from the pool.
62    pub async fn take_raw(&self) -> MemoryFromPool {
63        let _permit = self
64            .semaphore
65            .clone()
66            .acquire_owned()
67            .await
68            .expect("Semaphore is not closed");
69        let mut pool = self.pool.lock().expect("poisoned");
70        let memory = pool.pop().unwrap_or_default();
71
72        MemoryFromPool {
73            pool: self.clone(),
74            memory,
75            _permit,
76        }
77    }
78
79    /// Adds a new memory instance to the pool.
80    fn recycle_raw(&self, mut mem: MemoryInstance) {
81        mem.reset();
82        let mut pool = self.pool.lock().expect("poisoned");
83        pool.push(mem);
84    }
85}
86
87impl fmt::Debug for MemoryPool {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self.pool.lock() {
90            Ok(pool) => {
91                write!(f, "SharedVmMemoryPool {{ pool: [{} items] }}", pool.len())
92            }
93            Err(_) => write!(f, "SharedVmMemoryPool {{ pool: [poisoned] }}"),
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use std::time::Duration;
102
103    #[tokio::test]
104    async fn memory_pool_recycling_works() {
105        // Given
106        let pool = MemoryPool::new(1);
107
108        // When
109        let mut mem_guard = pool.take_raw().await;
110        let mem = mem_guard.as_mut();
111        mem.grow_stack(1024).expect("Unable to grow stack");
112        mem.write_bytes_noownerchecks(0, [1, 2, 3, 4])
113            .expect("Unable to write stack");
114        let ptr1 = mem.stack_raw() as *const _ as *const u8 as usize;
115        drop(mem_guard);
116
117        // Then
118        // Make sure we get the same memory allocation back
119        let mem = pool.take_raw().await;
120        let ptr2 = mem.as_ref().stack_raw() as *const _ as *const u8 as usize;
121        assert_eq!(ptr1, ptr2);
122    }
123
124    #[tokio::test]
125    async fn memory_pool_locking_works() {
126        // Given
127        const POOL_SIZE: usize = 4;
128        let pool = MemoryPool::new(POOL_SIZE);
129        let mut _drop = vec![];
130        for _ in 0..POOL_SIZE {
131            _drop.push(pool.take_raw().await);
132        }
133
134        // When
135        let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;
136
137        // Then
138        assert!(mem.is_err());
139    }
140
141    #[tokio::test]
142    async fn memory_pool_freeing_works() {
143        // Given
144        const POOL_SIZE: usize = 4;
145        let pool = MemoryPool::new(POOL_SIZE);
146        let mut _drop = vec![];
147        for _ in 0..POOL_SIZE {
148            _drop.push(pool.take_raw().await);
149        }
150        drop(_drop);
151
152        // When
153        let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;
154
155        // Then
156        assert!(mem.is_ok());
157    }
158}