fuel_core/service/
vm_pool.rs1use core::{
2 fmt,
3 mem,
4};
5use fuel_core_types::fuel_vm::interpreter::MemoryInstance;
6use std::sync::{
7 Arc,
8 Mutex,
9};
10use tokio::sync::OwnedSemaphorePermit;
11
12pub struct MemoryFromPool {
15 pool: MemoryPool,
16 memory: MemoryInstance,
17 _permit: OwnedSemaphorePermit,
18}
19
20impl Drop for MemoryFromPool {
21 fn drop(&mut self) {
22 self.pool.recycle_raw(mem::take(&mut self.memory));
23 }
24}
25
26impl fmt::Debug for MemoryFromPool {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 f.debug_struct("MemoryFromPool")
29 .field("pool", &"..")
30 .field("memory", &self.memory)
31 .finish()
32 }
33}
34
35impl AsRef<MemoryInstance> for MemoryFromPool {
36 fn as_ref(&self) -> &MemoryInstance {
37 self.memory.as_ref()
38 }
39}
40
41impl AsMut<MemoryInstance> for MemoryFromPool {
42 fn as_mut(&mut self) -> &mut MemoryInstance {
43 self.memory.as_mut()
44 }
45}
46
47#[derive(Clone)]
49pub struct MemoryPool {
50 semaphore: Arc<tokio::sync::Semaphore>,
51 pool: Arc<Mutex<Vec<MemoryInstance>>>,
52}
53impl MemoryPool {
54 pub fn new(number_of_instances: usize) -> Self {
55 Self {
56 semaphore: Arc::new(tokio::sync::Semaphore::new(number_of_instances)),
57 pool: Arc::new(Mutex::new(Vec::new())),
58 }
59 }
60
61 pub async fn take_raw(&self) -> MemoryFromPool {
63 let _permit = self
64 .semaphore
65 .clone()
66 .acquire_owned()
67 .await
68 .expect("Semaphore is not closed");
69 let mut pool = self.pool.lock().expect("poisoned");
70 let memory = pool.pop().unwrap_or_default();
71
72 MemoryFromPool {
73 pool: self.clone(),
74 memory,
75 _permit,
76 }
77 }
78
79 fn recycle_raw(&self, mut mem: MemoryInstance) {
81 mem.reset();
82 let mut pool = self.pool.lock().expect("poisoned");
83 pool.push(mem);
84 }
85}
86
87impl fmt::Debug for MemoryPool {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self.pool.lock() {
90 Ok(pool) => {
91 write!(f, "SharedVmMemoryPool {{ pool: [{} items] }}", pool.len())
92 }
93 Err(_) => write!(f, "SharedVmMemoryPool {{ pool: [poisoned] }}"),
94 }
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101 use std::time::Duration;
102
103 #[tokio::test]
104 async fn memory_pool_recycling_works() {
105 let pool = MemoryPool::new(1);
107
108 let mut mem_guard = pool.take_raw().await;
110 let mem = mem_guard.as_mut();
111 mem.grow_stack(1024).expect("Unable to grow stack");
112 mem.write_bytes_noownerchecks(0, [1, 2, 3, 4])
113 .expect("Unable to write stack");
114 let ptr1 = mem.stack_raw() as *const _ as *const u8 as usize;
115 drop(mem_guard);
116
117 let mem = pool.take_raw().await;
120 let ptr2 = mem.as_ref().stack_raw() as *const _ as *const u8 as usize;
121 assert_eq!(ptr1, ptr2);
122 }
123
124 #[tokio::test]
125 async fn memory_pool_locking_works() {
126 const POOL_SIZE: usize = 4;
128 let pool = MemoryPool::new(POOL_SIZE);
129 let mut _drop = vec![];
130 for _ in 0..POOL_SIZE {
131 _drop.push(pool.take_raw().await);
132 }
133
134 let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;
136
137 assert!(mem.is_err());
139 }
140
141 #[tokio::test]
142 async fn memory_pool_freeing_works() {
143 const POOL_SIZE: usize = 4;
145 let pool = MemoryPool::new(POOL_SIZE);
146 let mut _drop = vec![];
147 for _ in 0..POOL_SIZE {
148 _drop.push(pool.take_raw().await);
149 }
150 drop(_drop);
151
152 let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;
154
155 assert!(mem.is_ok());
157 }
158}