foyer_memory/
pipe.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18    code::{Key, Value},
19    properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24/// A piece of record that is irrelevant to the eviction algorithm.
25///
26/// With [`Piece`], the disk cache doesn't need to consider the eviction generic type.
27pub struct Piece<K, V, P> {
28    record: *const (),
29    key: *const K,
30    value: *const V,
31    hash: u64,
32    properties: *const P,
33    drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("Piece")
39            .field("record", &self.record)
40            .field("hash", &self.hash)
41            .finish()
42    }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47    K: Key,
48    V: Value,
49    P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54    K: Key,
55    V: Value,
56    P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61    fn drop(&mut self) {
62        (self.drop_fn)(self.record);
63    }
64}
65
66impl<K, V, P> Clone for Piece<K, V, P> {
67    fn clone(&self) -> Self {
68        unsafe { Arc::increment_strong_count(self.record) };
69        Self {
70            record: self.record,
71            key: self.key,
72            value: self.value,
73            hash: self.hash,
74            properties: self.properties,
75            drop_fn: self.drop_fn,
76        }
77    }
78}
79
80impl<K, V, P> Piece<K, V, P> {
81    /// Create a record piece from a record wrapped by [`Arc`].
82    pub fn new<E>(record: Arc<Record<E>>) -> Self
83    where
84        E: Eviction<Key = K, Value = V, Properties = P>,
85    {
86        let raw = Arc::into_raw(record);
87        let record = raw as *const ();
88        let key = unsafe { (*raw).key() } as *const _;
89        let value = unsafe { (*raw).value() } as *const _;
90        let hash = unsafe { (*raw).hash() };
91        let properties = unsafe { (*raw).properties() } as *const _;
92        let drop_fn = |ptr| unsafe {
93            let _ = Arc::from_raw(ptr as *const Record<E>);
94        };
95        Self {
96            record,
97            key,
98            value,
99            hash,
100            properties,
101            drop_fn,
102        }
103    }
104
105    /// Get the key of the record.
106    pub fn key(&self) -> &K {
107        unsafe { &*self.key }
108    }
109
110    /// Get the value of the record.
111    pub fn value(&self) -> &V {
112        unsafe { &*self.value }
113    }
114
115    /// Get the hash of the record.
116    pub fn hash(&self) -> u64 {
117        self.hash
118    }
119
120    /// Get the properties of the record.
121    pub fn properties(&self) -> &P {
122        unsafe { &*self.properties }
123    }
124
125    pub(crate) fn into_record<E>(mut self) -> Arc<Record<E>>
126    where
127        E: Eviction<Key = K, Value = V, Properties = P>,
128    {
129        self.drop_fn = |_| {};
130        let record = self.record as *const Record<E>;
131        unsafe { Arc::from_raw(record) }
132    }
133}
134
135/// An Arc-wrapped dynamic [`Pipe`] impl.
136pub type ArcPipe<K, V, P> = Arc<dyn Pipe<Key = K, Value = V, Properties = P>>;
137
138/// Pipe is used to notify disk cache to cache entries from the in-memory cache.
139pub trait Pipe: Send + Sync + 'static + Debug {
140    /// Type of the key of the record.
141    type Key;
142    /// Type of the value of the record.
143    type Value;
144    /// Type of the properties of the record.
145    type Properties;
146
147    /// Decide whether to send evicted entry to pipe.
148    fn is_enabled(&self) -> bool;
149
150    /// Send the piece to the disk cache.
151    fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
152
153    /// Flush all the pieces to the disk cache in an asynchronous manner.
154    ///
155    /// This function is called when the in-memory cache is flushed.
156    /// It is expected to obey the io throttle of the disk cache.
157    fn flush(
158        &self,
159        pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
160    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
161}
162
163/// A no-op pipe that is never enabled.
164pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
165
166impl<K, V, P> Debug for NoopPipe<K, V, P> {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        f.debug_tuple("NoopPipe").finish()
169    }
170}
171
172impl<K, V, P> Default for NoopPipe<K, V, P> {
173    fn default() -> Self {
174        Self(PhantomData)
175    }
176}
177
178impl<K, V, P> Pipe for NoopPipe<K, V, P>
179where
180    K: Key,
181    V: Value,
182    P: Properties,
183{
184    type Key = K;
185    type Value = V;
186    type Properties = P;
187
188    fn is_enabled(&self) -> bool {
189        false
190    }
191
192    fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
193
194    fn flush(
195        &self,
196        _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
197    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
198        Box::pin(std::future::ready(()))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::{
206        eviction::{fifo::Fifo, test_utils::TestProperties},
207        record::Data,
208    };
209
210    #[test]
211    fn test_piece() {
212        let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
213            key: Arc::new(vec![b'k'; 4096]),
214            value: Arc::new(vec![b'k'; 16384]),
215            properties: TestProperties::default(),
216            hash: 1,
217            weight: 1,
218        }));
219
220        let p1 = Piece::new(r1.clone());
221        let k1 = p1.key().clone();
222        let r2 = r1.clone();
223        let p2 = Piece::new(r1.clone());
224
225        drop(p1);
226        drop(r2);
227        drop(p2);
228        drop(r1);
229        drop(k1);
230    }
231}