1use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
16
17use foyer_common::{
18 code::{Key, Value},
19 properties::Properties,
20};
21
22use crate::{record::Record, Eviction};
23
24pub struct Piece<K, V, P> {
28 record: *const (),
29 key: *const K,
30 value: *const V,
31 hash: u64,
32 properties: *const P,
33 drop_fn: fn(*const ()),
34}
35
36impl<K, V, P> Debug for Piece<K, V, P> {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("Piece")
39 .field("record", &self.record)
40 .field("hash", &self.hash)
41 .finish()
42 }
43}
44
45unsafe impl<K, V, P> Send for Piece<K, V, P>
46where
47 K: Key,
48 V: Value,
49 P: Properties,
50{
51}
52unsafe impl<K, V, P> Sync for Piece<K, V, P>
53where
54 K: Key,
55 V: Value,
56 P: Properties,
57{
58}
59
60impl<K, V, P> Drop for Piece<K, V, P> {
61 fn drop(&mut self) {
62 (self.drop_fn)(self.record);
63 }
64}
65
66impl<K, V, P> Piece<K, V, P> {
67 pub fn new<E>(record: Arc<Record<E>>) -> Self
69 where
70 E: Eviction<Key = K, Value = V, Properties = P>,
71 {
72 let raw = Arc::into_raw(record);
73 let record = raw as *const ();
74 let key = unsafe { (*raw).key() } as *const _;
75 let value = unsafe { (*raw).value() } as *const _;
76 let hash = unsafe { (*raw).hash() };
77 let properties = unsafe { (*raw).properties() } as *const _;
78 let drop_fn = |ptr| unsafe {
79 let _ = Arc::from_raw(ptr as *const Record<E>);
80 };
81 Self {
82 record,
83 key,
84 value,
85 hash,
86 properties,
87 drop_fn,
88 }
89 }
90
91 pub fn key(&self) -> &K {
93 unsafe { &*self.key }
94 }
95
96 pub fn value(&self) -> &V {
98 unsafe { &*self.value }
99 }
100
101 pub fn hash(&self) -> u64 {
103 self.hash
104 }
105
106 pub fn properties(&self) -> &P {
108 unsafe { &*self.properties }
109 }
110}
111
112pub trait Pipe: Send + Sync + 'static + Debug {
114 type Key;
116 type Value;
118 type Properties;
120
121 fn is_enabled(&self) -> bool;
123
124 fn send(&self, piece: Piece<Self::Key, Self::Value, Self::Properties>);
126
127 fn flush(
132 &self,
133 pieces: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
134 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
135}
136
137pub struct NoopPipe<K, V, P>(PhantomData<(K, V, P)>);
139
140impl<K, V, P> Debug for NoopPipe<K, V, P> {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_tuple("NoopPipe").finish()
143 }
144}
145
146impl<K, V, P> Default for NoopPipe<K, V, P> {
147 fn default() -> Self {
148 Self(PhantomData)
149 }
150}
151
152impl<K, V, P> Pipe for NoopPipe<K, V, P>
153where
154 K: Key,
155 V: Value,
156 P: Properties,
157{
158 type Key = K;
159 type Value = V;
160 type Properties = P;
161
162 fn is_enabled(&self) -> bool {
163 false
164 }
165
166 fn send(&self, _: Piece<Self::Key, Self::Value, Self::Properties>) {}
167
168 fn flush(
169 &self,
170 _: Vec<Piece<Self::Key, Self::Value, Self::Properties>>,
171 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
172 Box::pin(async {})
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::{
180 eviction::{fifo::Fifo, test_utils::TestProperties},
181 record::Data,
182 };
183
184 #[test]
185 fn test_piece() {
186 let r1 = Arc::new(Record::new(Data::<Fifo<Arc<Vec<u8>>, Arc<Vec<u8>>, TestProperties>> {
187 key: Arc::new(vec![b'k'; 4096]),
188 value: Arc::new(vec![b'k'; 16384]),
189 properties: TestProperties::default(),
190 hash: 1,
191 weight: 1,
192 }));
193
194 let p1 = Piece::new(r1.clone());
195 let k1 = p1.key().clone();
196 let r2 = r1.clone();
197 let p2 = Piece::new(r1.clone());
198
199 drop(p1);
200 drop(r2);
201 drop(p2);
202 drop(r1);
203 drop(k1);
204 }
205}