1use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18 code::{Key, Value},
19 properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24pub struct Piece<K, V, P> {
28 record: *const (),
29 key: *const K,
30 value: *const V,
31 hash: u64,
32 properties: *const P,
33 drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("Piece")
39 .field("record", &self.record)
40 .field("hash", &self.hash)
41 .finish()
42 }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47 K: Key,
48 V: Value,
49 P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54 K: Key,
55 V: Value,
56 P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61 fn drop(&mut self) {
62 (self.drop_fn)(self.record);
63 }
64}
65
66impl<K, V, P> Clone for Piece<K, V, P> {
67 fn clone(&self) -> Self {
68 unsafe { Arc::increment_strong_count(self.record) };
69 Self {
70 record: self.record,
71 key: self.key,
72 value: self.value,
73 hash: self.hash,
74 properties: self.properties,
75 drop_fn: self.drop_fn,
76 }
77 }
78}
79
80impl<K, V, P> Piece<K, V, P> {
81 pub fn new<E>(record: Arc<Record<E>>) -> Self
83 where
84 E: Eviction<Key = K, Value = V, Properties = P>,
85 {
86 let raw = Arc::into_raw(record);
87 let record = raw as *const ();
88 let key = unsafe { (*raw).key() } as *const _;
89 let value = unsafe { (*raw).value() } as *const _;
90 let hash = unsafe { (*raw).hash() };
91 let properties = unsafe { (*raw).properties() } as *const _;
92 let drop_fn = |ptr| unsafe {
93 let _ = Arc::from_raw(ptr as *const Record<E>);
94 };
95 Self {
96 record,
97 key,
98 value,
99 hash,
100 properties,
101 drop_fn,
102 }
103 }
104
105 pub fn key(&self) -> &K {
107 unsafe { &*self.key }
108 }
109
110 pub fn value(&self) -> &V {
112 unsafe { &*self.value }
113 }
114
115 pub fn hash(&self) -> u64 {
117 self.hash
118 }
119
120 pub fn properties(&self) -> &P {
122 unsafe { &*self.properties }
123 }
124
125 pub(crate) fn into_record<E>(mut self) -> Arc<Record<E>>
126 where
127 E: Eviction<Key = K, Value = V, Properties = P>,
128 {
129 self.drop_fn = |_| {};
130 let record = self.record as *const Record<E>;
131 unsafe { Arc::from_raw(record) }
132 }
133}
134
135pub type ArcPipe<K, V, P> = Arc<dyn Pipe<Key = K, Value = V, Properties = P>>;
137
138pub trait Pipe: Send + Sync + 'static + Debug {
140 type Key;
142 type Value;
144 type Properties;
146
147 fn is_enabled(&self) -> bool;
149
150 fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
152
153 fn flush(
158 &self,
159 pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
160 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
161}
162
163pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
165
166impl<K, V, P> Debug for NoopPipe<K, V, P> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 f.debug_tuple("NoopPipe").finish()
169 }
170}
171
172impl<K, V, P> Default for NoopPipe<K, V, P> {
173 fn default() -> Self {
174 Self(PhantomData)
175 }
176}
177
178impl<K, V, P> Pipe for NoopPipe<K, V, P>
179where
180 K: Key,
181 V: Value,
182 P: Properties,
183{
184 type Key = K;
185 type Value = V;
186 type Properties = P;
187
188 fn is_enabled(&self) -> bool {
189 false
190 }
191
192 fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
193
194 fn flush(
195 &self,
196 _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
197 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
198 Box::pin(std::future::ready(()))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::{
206 eviction::{fifo::Fifo, test_utils::TestProperties},
207 record::Data,
208 };
209
210 #[test]
211 fn test_piece() {
212 let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
213 key: Arc::new(vec![b'k'; 4096]),
214 value: Arc::new(vec![b'k'; 16384]),
215 properties: TestProperties::default(),
216 hash: 1,
217 weight: 1,
218 }));
219
220 let p1 = Piece::new(r1.clone());
221 let k1 = p1.key().clone();
222 let r2 = r1.clone();
223 let p2 = Piece::new(r1.clone());
224
225 drop(p1);
226 drop(r2);
227 drop(p2);
228 drop(r1);
229 drop(k1);
230 }
231}