1use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18 code::{Key, Value},
19 properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24pub struct Piece<K, V, P> {
28 record: *const (),
29 key: *const K,
30 value: *const V,
31 hash: u64,
32 properties: *const P,
33 drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("Piece")
39 .field("record", &self.record)
40 .field("hash", &self.hash)
41 .finish()
42 }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47 K: Key,
48 V: Value,
49 P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54 K: Key,
55 V: Value,
56 P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61 fn drop(&mut self) {
62 (self.drop_fn)(self.record);
63 }
64}
65
66impl<K, V, P> Clone for Piece<K, V, P> {
67 fn clone(&self) -> Self {
68 unsafe { Arc::increment_strong_count(self.record) };
69 Self {
70 record: self.record,
71 key: self.key,
72 value: self.value,
73 hash: self.hash,
74 properties: self.properties,
75 drop_fn: self.drop_fn,
76 }
77 }
78}
79
80impl<K, V, P> Piece<K, V, P> {
81 pub fn new<E>(record: Arc<Record<E>>) -> Self
83 where
84 E: Eviction<Key = K, Value = V, Properties = P>,
85 {
86 let raw = Arc::into_raw(record);
87 let record = raw as *const ();
88 let key = unsafe { (*raw).key() } as *const _;
89 let value = unsafe { (*raw).value() } as *const _;
90 let hash = unsafe { (*raw).hash() };
91 let properties = unsafe { (*raw).properties() } as *const _;
92 let drop_fn = |ptr| unsafe {
93 let _ = Arc::from_raw(ptr as *const Record<E>);
94 };
95 Self {
96 record,
97 key,
98 value,
99 hash,
100 properties,
101 drop_fn,
102 }
103 }
104
105 pub fn key(&self) -> &K {
107 unsafe { &*self.key }
108 }
109
110 pub fn value(&self) -> &V {
112 unsafe { &*self.value }
113 }
114
115 pub fn hash(&self) -> u64 {
117 self.hash
118 }
119
120 pub fn properties(&self) -> &P {
122 unsafe { &*self.properties }
123 }
124
125 pub(crate) fn into_record<E>(mut self) -> Arc<Record<E>>
126 where
127 E: Eviction<Key = K, Value = V, Properties = P>,
128 {
129 self.drop_fn = |_| {};
130 let record = self.record as *const Record<E>;
131 unsafe { Arc::from_raw(record) }
132 }
133}
134
135pub trait Pipe: Send + Sync + 'static + Debug {
137 type Key;
139 type Value;
141 type Properties;
143
144 fn is_enabled(&self) -> bool;
146
147 fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
149
150 fn flush(
155 &self,
156 pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
157 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
158}
159
160pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
162
163impl<K, V, P> Debug for NoopPipe<K, V, P> {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_tuple("NoopPipe").finish()
166 }
167}
168
169impl<K, V, P> Default for NoopPipe<K, V, P> {
170 fn default() -> Self {
171 Self(PhantomData)
172 }
173}
174
175impl<K, V, P> Pipe for NoopPipe<K, V, P>
176where
177 K: Key,
178 V: Value,
179 P: Properties,
180{
181 type Key = K;
182 type Value = V;
183 type Properties = P;
184
185 fn is_enabled(&self) -> bool {
186 false
187 }
188
189 fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
190
191 fn flush(
192 &self,
193 _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
194 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
195 Box::pin(async {})
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use crate::{
203 eviction::{fifo::Fifo, test_utils::TestProperties},
204 record::Data,
205 };
206
207 #[test]
208 fn test_piece() {
209 let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
210 key: Arc::new(vec![b'k'; 4096]),
211 value: Arc::new(vec![b'k'; 16384]),
212 properties: TestProperties::default(),
213 hash: 1,
214 weight: 1,
215 }));
216
217 let p1 = Piece::new(r1.clone());
218 let k1 = p1.key().clone();
219 let r2 = r1.clone();
220 let p2 = Piece::new(r1.clone());
221
222 drop(p1);
223 drop(r2);
224 drop(p2);
225 drop(r1);
226 drop(k1);
227 }
228}