foyer_memory/
pipe.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18    code::{Key, Value},
19    properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24/// A piece of record that is irrelevant to the eviction algorithm.
25///
26/// With [`Piece`], the disk cache doesn't need to consider the eviction generic type.
27pub struct Piece<K, V, P> {
28    record: *const (),
29    key: *const K,
30    value: *const V,
31    hash: u64,
32    properties: *const P,
33    drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("Piece")
39            .field("record", &self.record)
40            .field("hash", &self.hash)
41            .finish()
42    }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47    K: Key,
48    V: Value,
49    P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54    K: Key,
55    V: Value,
56    P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61    fn drop(&mut self) {
62        (self.drop_fn)(self.record);
63    }
64}
65
66impl<K, V, P> Piece<K, V, P> {
67    /// Create a record piece from an record wrapped by [`Arc`].
68    pub fn new<E>(record: Arc<Record<E>>) -> Self
69    where
70        E: Eviction<Key = K, Value = V, Properties = P>,
71    {
72        let raw = Arc::into_raw(record);
73        let record = raw as *const ();
74        let key = unsafe { (*raw).key() } as *const _;
75        let value = unsafe { (*raw).value() } as *const _;
76        let hash = unsafe { (*raw).hash() };
77        let properties = unsafe { (*raw).properties() } as *const _;
78        let drop_fn = |ptr| unsafe {
79            let _ = Arc::from_raw(ptr as *const Record<E>);
80        };
81        Self {
82            record,
83            key,
84            value,
85            hash,
86            properties,
87            drop_fn,
88        }
89    }
90
91    /// Get the key of the record.
92    pub fn key(&self) -> &K {
93        unsafe { &*self.key }
94    }
95
96    /// Get the value of the record.
97    pub fn value(&self) -> &V {
98        unsafe { &*self.value }
99    }
100
101    /// Get the hash of the record.
102    pub fn hash(&self) -> u64 {
103        self.hash
104    }
105
106    /// Get the properties of the record.
107    pub fn properties(&self) -> &P {
108        unsafe { &*self.properties }
109    }
110}
111
112/// Pipe is used to notify disk cache to cache entries from the in-memory cache.
113pub trait Pipe: Send + Sync + 'static + Debug {
114    /// Type of the key of the record.
115    type Key;
116    /// Type of the value of the record.
117    type Value;
118    /// Type of the properties of the record.
119    type Properties;
120
121    /// Decide whether to send evicted entry to pipe.
122    fn is_enabled(&self) -> bool;
123
124    /// Send the piece to the disk cache.
125    fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
126
127    /// Flush all the pieces to the disk cache in a asynchronous manner.
128    ///
129    /// This function is called when the in-memory cache is flushed.
130    /// It is expected to obey the io throttle of the disk cache.
131    fn flush(
132        &self,
133        pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
134    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
135}
136
137/// An no-op pipe that is never enabled.
138pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
139
140impl<K, V, P> Debug for NoopPipe<K, V, P> {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_tuple("NoopPipe").finish()
143    }
144}
145
146impl<K, V, P> Default for NoopPipe<K, V, P> {
147    fn default() -> Self {
148        Self(PhantomData)
149    }
150}
151
152impl<K, V, P> Pipe for NoopPipe<K, V, P>
153where
154    K: Key,
155    V: Value,
156    P: Properties,
157{
158    type Key = K;
159    type Value = V;
160    type Properties = P;
161
162    fn is_enabled(&self) -> bool {
163        false
164    }
165
166    fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
167
168    fn flush(
169        &self,
170        _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
171    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
172        Box::pin(async {})
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::{
180        eviction::{fifo::Fifo, test_utils::TestProperties},
181        record::Data,
182    };
183
184    #[test]
185    fn test_piece() {
186        let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
187            key: Arc::new(vec![b'k'; 4096]),
188            value: Arc::new(vec![b'k'; 16384]),
189            properties: TestProperties::default(),
190            hash: 1,
191            weight: 1,
192        }));
193
194        let p1 = Piece::new(r1.clone());
195        let k1 = p1.key().clone();
196        let r2 = r1.clone();
197        let p2 = Piece::new(r1.clone());
198
199        drop(p1);
200        drop(r2);
201        drop(p2);
202        drop(r1);
203        drop(k1);
204    }
205}