foyer_memory/
pipe.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18    code::{Key, Value},
19    properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24/// A piece of record that is irrelevant to the eviction algorithm.
25///
26/// With [`Piece`], the disk cache doesn't need to consider the eviction generic type.
27pub struct Piece<K, V, P> {
28    record: *const (),
29    key: *const K,
30    value: *const V,
31    hash: u64,
32    properties: *const P,
33    drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("Piece")
39            .field("record", &self.record)
40            .field("hash", &self.hash)
41            .finish()
42    }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47    K: Key,
48    V: Value,
49    P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54    K: Key,
55    V: Value,
56    P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61    fn drop(&mut self) {
62        (self.drop_fn)(self.record);
63    }
64}
65
66impl<K, V, P> Clone for Piece<K, V, P> {
67    fn clone(&self) -> Self {
68        unsafe { Arc::increment_strong_count(self.record) };
69        Self {
70            record: self.record,
71            key: self.key,
72            value: self.value,
73            hash: self.hash,
74            properties: self.properties,
75            drop_fn: self.drop_fn,
76        }
77    }
78}
79
80impl<K, V, P> Piece<K, V, P> {
81    /// Create a record piece from an record wrapped by [`Arc`].
82    pub fn new<E>(record: Arc<Record<E>>) -> Self
83    where
84        E: Eviction<Key = K, Value = V, Properties = P>,
85    {
86        let raw = Arc::into_raw(record);
87        let record = raw as *const ();
88        let key = unsafe { (*raw).key() } as *const _;
89        let value = unsafe { (*raw).value() } as *const _;
90        let hash = unsafe { (*raw).hash() };
91        let properties = unsafe { (*raw).properties() } as *const _;
92        let drop_fn = |ptr| unsafe {
93            let _ = Arc::from_raw(ptr as *const Record<E>);
94        };
95        Self {
96            record,
97            key,
98            value,
99            hash,
100            properties,
101            drop_fn,
102        }
103    }
104
105    /// Get the key of the record.
106    pub fn key(&self) -> &K {
107        unsafe { &*self.key }
108    }
109
110    /// Get the value of the record.
111    pub fn value(&self) -> &V {
112        unsafe { &*self.value }
113    }
114
115    /// Get the hash of the record.
116    pub fn hash(&self) -> u64 {
117        self.hash
118    }
119
120    /// Get the properties of the record.
121    pub fn properties(&self) -> &P {
122        unsafe { &*self.properties }
123    }
124
125    pub(crate) fn into_record<E>(mut self) -> Arc<Record<E>>
126    where
127        E: Eviction<Key = K, Value = V, Properties = P>,
128    {
129        self.drop_fn = |_| {};
130        let record = self.record as *const Record<E>;
131        unsafe { Arc::from_raw(record) }
132    }
133}
134
135/// Pipe is used to notify disk cache to cache entries from the in-memory cache.
136pub trait Pipe: Send + Sync + 'static + Debug {
137    /// Type of the key of the record.
138    type Key;
139    /// Type of the value of the record.
140    type Value;
141    /// Type of the properties of the record.
142    type Properties;
143
144    /// Decide whether to send evicted entry to pipe.
145    fn is_enabled(&self) -> bool;
146
147    /// Send the piece to the disk cache.
148    fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
149
150    /// Flush all the pieces to the disk cache in a asynchronous manner.
151    ///
152    /// This function is called when the in-memory cache is flushed.
153    /// It is expected to obey the io throttle of the disk cache.
154    fn flush(
155        &self,
156        pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
157    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
158}
159
160/// An no-op pipe that is never enabled.
161pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
162
163impl<K, V, P> Debug for NoopPipe<K, V, P> {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_tuple("NoopPipe").finish()
166    }
167}
168
169impl<K, V, P> Default for NoopPipe<K, V, P> {
170    fn default() -> Self {
171        Self(PhantomData)
172    }
173}
174
175impl<K, V, P> Pipe for NoopPipe<K, V, P>
176where
177    K: Key,
178    V: Value,
179    P: Properties,
180{
181    type Key = K;
182    type Value = V;
183    type Properties = P;
184
185    fn is_enabled(&self) -> bool {
186        false
187    }
188
189    fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
190
191    fn flush(
192        &self,
193        _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
194    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
195        Box::pin(async {})
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use crate::{
203        eviction::{fifo::Fifo, test_utils::TestProperties},
204        record::Data,
205    };
206
207    #[test]
208    fn test_piece() {
209        let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
210            key: Arc::new(vec![b'k'; 4096]),
211            value: Arc::new(vec![b'k'; 16384]),
212            properties: TestProperties::default(),
213            hash: 1,
214            weight: 1,
215        }));
216
217        let p1 = Piece::new(r1.clone());
218        let k1 = p1.key().clone();
219        let r2 = r1.clone();
220        let p2 = Piece::new(r1.clone());
221
222        drop(p1);
223        drop(r2);
224        drop(p2);
225        drop(r1);
226        drop(k1);
227    }
228}