shoal_derive/lib.rs
1extern crate proc_macro;
2
3use darling::FromAttributes;
4use proc_macro::TokenStream;
5use quote::{format_ident, quote};
6use syn::{Data, DataStruct, Fields, Ident};
7
8/// The arguments for a FromShoal derive
9#[derive(Debug, darling::FromAttributes)]
10#[darling(attributes(shoal_table))]
11struct ShoalTable {
12 ///// The name of this table
13 //pub name: String,
14 /// The name of the database this table is in
15 pub db: String,
16}
17
18/// Extend a token stream with a FromShoal implementation
19///
20/// # Arguments
21///
22/// * `stream` - The stream to extend
23/// * `name` - The name of the type we are extending
24/// * `db_name` - The name of the database
25/// * `response_name` - The name of the response type
26fn add_from_shoal(
27 stream: &mut proc_macro2::TokenStream,
28 name: &Ident,
29 db_name: &Ident,
30 response_name: &Ident,
31) {
32 // extend our token stream
33 stream.extend(
34 quote! {
35 #[automatically_derived]
36 impl shoal_core::FromShoal<#db_name> for #name {
37 type ResponseKinds = <#db_name as shoal_core::shared::traits::ShoalDatabase>::ResponseKinds;
38
39 fn retrieve(kind: #response_name) -> Result<Option<Vec<Self>>, shoal_core::client::Errors> {
40 // make sure its the right data kind
41 if let #response_name::#name(action) = kind {
42 // make sure its a get action
43 if let shoal_core::shared::responses::ResponseAction::Get(rows) = action.data {
44 return Ok(rows);
45 }
46 }
47 Err(shoal_core::client::Errors::WrongType("Wrong Type!".to_owned()))
48 }
49 }
50 }
51 );
52}
53
54/// Extend a token stream with a RkyvSupport impl
55///
56/// # Arguments
57///
58/// * `stream` - The stream to extend
59/// * `name` - The name of the type we are extending
60fn add_rkyv_support(stream: &mut proc_macro2::TokenStream, name: &Ident) {
61 // extend our token stream
62 stream.extend(quote! {
63 #[automatically_derived]
64 impl shoal_core::shared::traits::RkyvSupport for #name {}
65 });
66}
67
68/// Extend a token stream with a From<#name> for *QueryKinds implementation
69///
70/// # Arguments
71///
72/// * `stream` - The stream to extend
73/// * `name` - The name of the type we are extending
74/// * `query_name` - The name of the query type
75fn add_from_for_query(stream: &mut proc_macro2::TokenStream, name: &Ident, query_name: &Ident) {
76 // extend our token stream
77 stream.extend(quote! {
78 #[automatically_derived]
79 impl From<#name> for #query_name {
80 fn from(row: #name) -> #query_name {
81 // get our rows partition key
82 let key = #name::get_partition_key(&row);
83 // build our query kind
84 #query_name::#name(Query::Insert { key, row })
85 }
86 }
87 });
88}
89
90///// Extend a token stream with a FromShoal implementation
91/////
92///// # Arguments
93/////
94///// * `stream` - The stream to extend
95///// * `name` - The name of the type we are extending
96///// * `table_name` - The name of the table
97///// * `response_name` - The name of the response type
98//fn add_shoal_table(stream: &mut proc_macro2::TokenStream, name: &Ident) {
99// // extend our token stream
100// stream.extend(quote! {
101// #[automatically_derived]
102// impl ShoalTable for #name {
103// /// The sort type for this data
104// type Sort = String;
105//
106// /// Build the sort tuple for this row
107// fn get_sort(&self) -> &Self::Sort {
108// &self.key
109// }
110//
111// /// Calculate the partition key for this row
112// fn partition_key(sort: &Self::Sort) -> u64 {
113// // create a new hasher
114// let mut hasher = GxHasher::default();
115// // hash the first key
116// hasher.write(sort.as_bytes());
117// // get our hash
118// hasher.finish()
119// }
120//
121// /// Any filters to apply when listing/crawling rows
122// type Filters = String;
123//
124// /// Determine if a row should be filtered
125// ///
126// /// # Arguments
127// ///
128// /// * `filters` - The filters to apply
129// /// * `row` - The row to filter
130// fn is_filtered(filter: &Self::Filters, row: &Self) -> bool {
131// &row.value == filter
132// }
133// }
134// });
135//}
136
137/// Derive the basic traits and functions for a type to be a table in shoal
138#[proc_macro_derive(ShoalTable, attributes(shoal_table))]
139pub fn derive_shoal_table(stream: TokenStream) -> TokenStream {
140 // parse our target struct
141 let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
142 // get the name of our struct
143 let name = &ast.ident;
144 // we only support structs right now
145 match &ast.data {
146 Data::Struct(DataStruct { .. }) => (),
147 _ => unimplemented!("Only structs are currently supported"),
148 }
149 // start with an empty stream
150 let mut output = quote! {};
151 let attrs =
152 ShoalTable::from_attributes(&ast.attrs).expect("Failed to parse ShoalTable attributes");
153 // get our db and table name as a ident
154 let db_name = Ident::new(&attrs.db, name.span());
155 //let table_name = Ident::new(&attrs.name, name.span());
156 // build the name of our kinds
157 let query_name = syn::Ident::new(&format!("{}QueryKinds", db_name), name.span());
158 let response_name = syn::Ident::new(&format!("{}ResponseKinds", db_name), name.span());
159 // extend this type
160 add_from_shoal(&mut output, name, &db_name, &response_name);
161 add_rkyv_support(&mut output, name);
162 add_from_for_query(&mut output, name, &query_name);
163 //add_shoal_table(&mut output, name);
164 // convert and return our stream
165 output.into()
166}
167
168fn add_query_kinds(name: &Ident) -> proc_macro2::TokenStream {
169 // build the query kinds type to set
170 quote!(
171 type QueryKinds = concat!(#name, QueryKinds);
172 )
173}
174
175fn add_db_new(name: &Ident, fields: &Fields) -> proc_macro2::TokenStream {
176 // build the entry for each field name and type
177 let field_iter = fields.iter().map(|field| {
178 // get this fields name
179 let ident = &field.ident;
180 let ftype = format_ident!("{}", stringify!(field.ty).split_once('<').unwrap().0);
181 // build this fields entry
182 quote! { #ident: #ftype::new(shared_name, conf).await? }
183 });
184 // build the query kinds type to set
185 quote!(
186 async fn new(shard_name: &str, conf: &Conf) -> Result<Self, ServerError> {
187 let db = #name {
188 // key_value: PersistentTable::new(shard_name, conf).await?,
189 #(#field_iter,)*
190 };
191 Ok(db)
192 }
193 )
194}
195
196fn add_db_trait(name: &Ident, fields: &Fields, stream: &mut proc_macro2::TokenStream) {
197 // build our new idents
198 let query_kinds = format_ident!("{}QueryKinds", name);
199 let response_kinds = format_ident!("{}ResponseKinds", name);
200 // build our different function implementations
201 let new = add_db_new(name, fields);
202 // build the entry for each field name and type
203 let field_iter = fields.iter().map(|field| {
204 // get this fields name
205 let ident = &field.ident;
206 let ftype = format_ident!("{}", stringify!(field.ty).split_once('<').unwrap().0);
207 // build this fields entry
208 quote! { #ident: #ftype::new(shared_name, conf).await? }
209 });
210 stream.extend(quote! {
211 impl ShoalDatabase for Basic {
212 /// The different tables or types of queries we will handle
213 type QueryKinds = #query_kinds;
214
215 /// The different tables we can get responses from
216 type ResponseKinds = #response_kinds;
217
218 /// Create a new shoal db instance
219 ///
220 /// # Arguments
221 ///
222 /// * `shard_name` - The id of the shard that owns this table
223 /// * `conf` - A shoal config
224 //#new
225 async fn new(shard_name: &str, conf: &Conf) -> Result<Self, ServerError> {
226 let db = Basic {
227 key_value: PersistentTable::new(shard_name, conf).await?,
228 };
229 Ok(db)
230 }
231
232 /// Handle messages for different table types
233 async fn handle(
234 &mut self,
235 meta: QueryMetadata,
236 typed_query: Self::QueryKinds,
237 ) -> Option<(SocketAddr, Self::ResponseKinds)> {
238 // match on the right query and execute it
239 match typed_query {
240 BasicQueryKinds::KeyValue(query) => {
241 // handle these queries
242 match self.key_value.handle(meta, query).await {
243 Some((addr, response)) => {
244 // wrap our response with the right table kind
245 let wrapped = BasicResponseKinds::KeyValue(response);
246 Some((addr, wrapped))
247 }
248 None => None,
249 }
250 }
251 }
252 }
253
254 /// Flush any in flight writes to disk
255 async fn flush(&mut self) -> Result<(), ServerError> {
256 self.key_value.flush().await
257 }
258
259 /// Get all flushed messages and send their response back
260 ///
261 /// # Arguments
262 ///
263 /// * `flushed` - The flushed response to send back
264 fn handle_flushed(&mut self, flushed: &mut Vec<(SocketAddr, Self::ResponseKinds)>) {
265 // get all flushed queries in their specific format
266 let specific = self.key_value.get_flushed();
267 // wrap and add our specific queries
268 let wrapped = specific
269 .drain(..)
270 .map(|(addr, resp)| (addr, BasicResponseKinds::KeyValue(resp)));
271 // extend our response list with our wrapped queries
272 flushed.extend(wrapped);
273 }
274
275 /// Shutdown this table and flush any data to disk if needed
276 async fn shutdown(&mut self) -> Result<(), ServerError> {
277 // shutdown the key value table
278 self.key_value.shutdown().await
279 }
280 }
281
282 });
283}
284
285/// Derive the basic traits and functions for a type to be a table in shoal
286#[proc_macro_derive(ShoalDB)]
287pub fn derive_shoal_db(stream: TokenStream) -> TokenStream {
288 // parse our target struct
289 let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
290 // get the name of our struct
291 let name = &ast.ident;
292 // we only support structs right now
293 let struct_data = match &ast.data {
294 Data::Struct(struct_data) => struct_data,
295 _ => unimplemented!("Only structs are currently supported"),
296 };
297 let fields = &struct_data.fields;
298 // start with an empty stream
299 let mut output = quote! {};
300 // add our shoal db trait
301 add_db_trait(name, fields, &mut output);
302 // convert and return our stream
303 output.into()
304}
305
306///// The arguments for a ShoalDB derive
307//#[derive(Debug, darling::FromAttributes)]
308//#[darling(attributes(shoal_db))]
309//struct ShoalDB {
310// /// The name of this database
311// pub name: String,
312//}
313//
314///// Extend a token stream with a ShoalDatabase implementation
315/////
316///// # Arguments
317/////
318///// * `stream` - The stream to extend
319///// * `name` - The name of the type we are extending
320///// * `table_name` - The name of the table
321///// * `response_name` - The name of the response type
322//fn add_shoal_database(
323// stream: &mut proc_macro2::TokenStream,
324// name: &Ident,
325// query_name: &Ident,
326// response_name: &Ident,
327// archived_response_name: &Ident,
328// tables: &Vec<Ident>,
329//) {
330// stream.extend(quote! {
331// #[automatically_derived]
332// impl ShoalDatabase for #name {
333// /// The different tables or types of queries we will handle
334// type QueryKinds = #query_name;
335//
336// /// The different tables we can get responses from
337// type ResponseKinds = #response_name;
338//
339// /// Deserialize our query types
340// fn unarchive(buff: &[u8]) -> Queries<Self> {
341// // try to cast this query
342// let query = shoal_core::rkyv::check_archived_root::<Queries<Self>>(buff).unwrap();
343// // deserialize it
344// query.deserialize(&mut rkyv::Infallible).unwrap()
345// }
346//
347// // Deserialize our response types
348// fn unarchive_response(buff: &[u8]) -> Self::ResponseKinds {
349// // try to cast this query
350// let query = shoal_core::rkyv::check_archived_root::<Self::ResponseKinds>(buff).unwrap();
351// // deserialize it
352// query.deserialize(&mut rkyv::Infallible).unwrap()
353// }
354//
355// // Deserialize our response types
356// fn response_query_id(buff: &[u8]) -> &Uuid {
357// // try to cast this query
358// let kinds = shoal_core::rkyv::check_archived_root::<Self::ResponseKinds>(buff).unwrap();
359// // pop our table kinds
360// shoal_core::build_response_query_id_match!(kinds, #archived_response_name, KeyValueRow)
361// }
362//
363// /// Get the index of a single [`Self::ResponseKinds`]
364// ///
365// /// # Arguments
366// ///
367// /// * `resp` - The resp to to get the order index for
368// fn response_index(resp: &Self::ResponseKinds) -> usize {
369// shoal_core::build_response_index_match!(resp, #response_name, KeyValueRow)
370// }
371//
372// /// Get whether this is the last response in a response stream
373// ///
374// /// # Arguments
375// ///
376// /// * `resp` - The response to check
377// fn is_end_of_stream(resp: &Self::ResponseKinds) -> bool {
378// shoal_core::build_response_end_match!(resp, #response_name, KeyValueRow)
379// }
380//
381// /// Forward our queries to the correct shards
382// async fn send_to_shard(
383// ring: &Ring,
384// mesh_tx: &mut Senders<MeshMsg<Self>>,
385// addr: SocketAddr,
386// queries: Queries<Self>,
387// ) -> Result<(), ServerError> {
388// let mut tmp = Vec::with_capacity(1);
389// // get the index for the last query in this bundle
390// let end_index = queries.queries.len() - 1;
391// // crawl over our queries
392// for (index, kind) in queries.queries.into_iter().enumerate() {
393// // get our target shards info
394// match &kind {
395// #query_name::KeyValueQuery(query) => {
396// // get our shards info
397// query.find_shard(ring, &mut tmp);
398// }
399// };
400// // send this query to the right shards
401// for shard_info in tmp.drain(..) {
402// match &shard_info.contact {
403// ShardContact::Local(id) => {
404// //println!("coord - sending query to shard: {id}");
405// mesh_tx
406// .send_to(
407// *id,
408// MeshMsg::Query {
409// addr,
410// id: queries.id,
411// index,
412// query: kind.clone(),
413// end: index == end_index,
414// },
415// )
416// .await
417// .unwrap();
418// }
419// };
420// }
421// }
422// Ok(())
423// }
424//
425// /// Handle messages for different table types
426// async fn handle(
427// &mut self,
428// id: Uuid,
429// index: usize,
430// typed_query: Self::QueryKinds,
431// end: bool,
432// ) -> Self::ResponseKinds {
433// // match on the right query and execute it
434// match typed_query {
435// #query_name::KeyValueQuery(query) => {
436// // handle these queries
437// #response_name::KeyValueRow(self.key_value.handle(id, index, query, end))
438// }
439// }
440// }
441// }
442// });
443//}
444
445//#[proc_macro_derive(ShoalDB, attributes(shoal_db))]
446//pub fn derive_shoal_database(stream: TokenStream) -> TokenStream {
447// // parse our target struct
448// let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
449// // get the name of our struct
450// let name = &ast.ident;
451// let attrs = ShoalDB::from_attributes(&ast.attrs).expect("Failed to parse ShoalDB attributes");
452// // build the names for our different derived types
453// let query_name = syn::Ident::new(&format!("{}QueryKinds", attrs.name), name.span());
454// let response_name = syn::Ident::new(&format!("{}ResponseKinds", attrs.name), name.span());
455// let archived_response_name =
456// syn::Ident::new(&format!("Archived{}ResponseKinds", attrs.name), name.span());
457// // we only support structs right now
458// let fields = match &ast.data {
459// Data::Struct(DataStruct { fields, .. }) => fields,
460// _ => unimplemented!("Only structs are currently supported"),
461// };
462// // start with an empty stream
463// let mut output = quote! {};
464// // init a list of the right size for the tables in our database
465// let mut tables: Vec<Ident> = Vec::with_capacity(fields.len());
466// // build a list of our field identies
467// for field in fields {
468// if let syn::Type::Path(path) = &field.ty {
469// if let Some(segment) = path.path.segments.first() {
470// if let PathArguments::AngleBracketed(angled) = &segment.arguments {
471// if let Some(arg) = angled.args.first() {
472// if let GenericArgument::Type(type_arg) = arg {
473// if let syn::Type::Path(path) = type_arg {
474// if let Some(segment) = path.path.segments.first() {
475// // add our field identity
476// tables.push(segment.ident.clone());
477// }
478// }
479// }
480// }
481// }
482// }
483// }
484// }
485// // add a ShoalDatabase implementation
486// add_shoal_database(
487// &mut output,
488// name,
489// &query_name,
490// &response_name,
491// &archived_response_name,
492// &tables,
493// );
494// // convert and return our stream
495// output.into()
496//}