shoal_derive/
lib.rs

1extern crate proc_macro;
2
3use darling::FromAttributes;
4use proc_macro::TokenStream;
5use quote::{format_ident, quote};
6use syn::{Data, DataStruct, Fields, Ident};
7
8/// The arguments for a FromShoal derive
9#[derive(Debug, darling::FromAttributes)]
10#[darling(attributes(shoal_table))]
11struct ShoalTable {
12    ///// The name of this table
13    //pub name: String,
14    /// The name of the database this table is in
15    pub db: String,
16}
17
18/// Extend a token stream with a FromShoal implementation
19///
20/// # Arguments
21///
22/// * `stream` - The stream to extend
23/// * `name` - The name of the type we are extending
24/// * `db_name` - The name of the database
25/// * `response_name` - The name of the response type
26fn add_from_shoal(
27    stream: &mut proc_macro2::TokenStream,
28    name: &Ident,
29    db_name: &Ident,
30    response_name: &Ident,
31) {
32    // extend our token stream
33    stream.extend(
34        quote! {
35            #[automatically_derived]
36            impl shoal_core::FromShoal<#db_name> for #name  {
37                type ResponseKinds = <#db_name as shoal_core::shared::traits::ShoalDatabase>::ResponseKinds;
38
39                fn retrieve(kind: #response_name) -> Result<Option<Vec<Self>>, shoal_core::client::Errors> {
40                    // make sure its the right data kind
41                    if let #response_name::#name(action) = kind {
42                        // make sure its a get action
43                        if let shoal_core::shared::responses::ResponseAction::Get(rows) = action.data {
44                            return Ok(rows);
45                        }
46                    }
47                    Err(shoal_core::client::Errors::WrongType("Wrong Type!".to_owned()))
48                }
49            }
50        }
51    );
52}
53
54/// Extend a token stream with a RkyvSupport impl
55///
56/// # Arguments
57///
58/// * `stream` - The stream to extend
59/// * `name` - The name of the type we are extending
60fn add_rkyv_support(stream: &mut proc_macro2::TokenStream, name: &Ident) {
61    // extend our token stream
62    stream.extend(quote! {
63        #[automatically_derived]
64        impl shoal_core::shared::traits::RkyvSupport for #name {}
65    });
66}
67
68/// Extend a token stream with a From<#name> for *QueryKinds implementation
69///
70/// # Arguments
71///
72/// * `stream` - The stream to extend
73/// * `name` - The name of the type we are extending
74/// * `query_name` - The name of the query type
75fn add_from_for_query(stream: &mut proc_macro2::TokenStream, name: &Ident, query_name: &Ident) {
76    // extend our token stream
77    stream.extend(quote! {
78        #[automatically_derived]
79        impl From<#name> for #query_name {
80            fn from(row: #name) -> #query_name {
81                // get our rows partition key
82                let key = #name::get_partition_key(&row);
83                // build our query kind
84                #query_name::#name(Query::Insert { key, row })
85            }
86        }
87    });
88}
89
90///// Extend a token stream with a FromShoal implementation
91/////
92///// # Arguments
93/////
94///// * `stream` - The stream to extend
95///// * `name` - The name of the type we are extending
96///// * `table_name` - The name of the table
97///// * `response_name` - The name of the response type
98//fn add_shoal_table(stream: &mut proc_macro2::TokenStream, name: &Ident) {
99//    // extend our token stream
100//    stream.extend(quote! {
101//        #[automatically_derived]
102//        impl ShoalTable for #name {
103//            /// The sort type for this data
104//            type Sort = String;
105//
106//            /// Build the sort tuple for this row
107//            fn get_sort(&self) -> &Self::Sort {
108//                &self.key
109//            }
110//
111//            /// Calculate the partition key for this row
112//            fn partition_key(sort: &Self::Sort) -> u64 {
113//                // create a new hasher
114//                let mut hasher = GxHasher::default();
115//                // hash the first key
116//                hasher.write(sort.as_bytes());
117//                // get our hash
118//                hasher.finish()
119//            }
120//
121//            /// Any filters to apply when listing/crawling rows
122//            type Filters = String;
123//
124//            /// Determine if a row should be filtered
125//            ///
126//            /// # Arguments
127//            ///
128//            /// * `filters` - The filters to apply
129//            /// * `row` - The row to filter
130//            fn is_filtered(filter: &Self::Filters, row: &Self) -> bool {
131//                &row.value == filter
132//            }
133//        }
134//    });
135//}
136
137/// Derive the basic traits and functions for a type to be a table in shoal
138#[proc_macro_derive(ShoalTable, attributes(shoal_table))]
139pub fn derive_shoal_table(stream: TokenStream) -> TokenStream {
140    // parse our target struct
141    let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
142    // get the name of our struct
143    let name = &ast.ident;
144    // we only support structs right now
145    match &ast.data {
146        Data::Struct(DataStruct { .. }) => (),
147        _ => unimplemented!("Only structs are currently supported"),
148    }
149    // start with an empty stream
150    let mut output = quote! {};
151    let attrs =
152        ShoalTable::from_attributes(&ast.attrs).expect("Failed to parse ShoalTable attributes");
153    // get our db and table name as a ident
154    let db_name = Ident::new(&attrs.db, name.span());
155    //let table_name = Ident::new(&attrs.name, name.span());
156    // build the name of our kinds
157    let query_name = syn::Ident::new(&format!("{}QueryKinds", db_name), name.span());
158    let response_name = syn::Ident::new(&format!("{}ResponseKinds", db_name), name.span());
159    // extend this type
160    add_from_shoal(&mut output, name, &db_name, &response_name);
161    add_rkyv_support(&mut output, name);
162    add_from_for_query(&mut output, name, &query_name);
163    //add_shoal_table(&mut output, name);
164    // convert and return our stream
165    output.into()
166}
167
168fn add_query_kinds(name: &Ident) -> proc_macro2::TokenStream {
169    // build the query kinds type to set
170    quote!(
171        type QueryKinds = concat!(#name, QueryKinds);
172    )
173}
174
175fn add_db_new(name: &Ident, fields: &Fields) -> proc_macro2::TokenStream {
176    // build the entry for each field name and type
177    let field_iter = fields.iter().map(|field| {
178        // get this fields name
179        let ident = &field.ident;
180        let ftype = format_ident!("{}", stringify!(field.ty).split_once('<').unwrap().0);
181        // build this fields entry
182        quote! { #ident: #ftype::new(shared_name, conf).await? }
183    });
184    // build the query kinds type to set
185    quote!(
186        async fn new(shard_name: &str, conf: &Conf) -> Result<Self, ServerError> {
187            let db = #name {
188            //    key_value: PersistentTable::new(shard_name, conf).await?,
189                #(#field_iter,)*
190            };
191            Ok(db)
192        }
193    )
194}
195
196fn add_db_trait(name: &Ident, fields: &Fields, stream: &mut proc_macro2::TokenStream) {
197    // build our new idents
198    let query_kinds = format_ident!("{}QueryKinds", name);
199    let response_kinds = format_ident!("{}ResponseKinds", name);
200    // build our different function implementations
201    let new = add_db_new(name, fields);
202    // build the entry for each field name and type
203    let field_iter = fields.iter().map(|field| {
204        // get this fields name
205        let ident = &field.ident;
206        let ftype = format_ident!("{}", stringify!(field.ty).split_once('<').unwrap().0);
207        // build this fields entry
208        quote! { #ident: #ftype::new(shared_name, conf).await? }
209    });
210    stream.extend(quote! {
211        impl ShoalDatabase for Basic {
212            /// The different tables or types of queries we will handle
213            type QueryKinds = #query_kinds;
214
215            /// The different tables we can get responses from
216            type ResponseKinds = #response_kinds;
217
218            /// Create a new shoal db instance
219            ///
220            /// # Arguments
221            ///
222            /// * `shard_name` - The id of the shard that owns this table
223            /// * `conf` - A shoal config
224            //#new
225            async fn new(shard_name: &str, conf: &Conf) -> Result<Self, ServerError> {
226                let db = Basic {
227                    key_value: PersistentTable::new(shard_name, conf).await?,
228                };
229                Ok(db)
230            }
231
232            /// Handle messages for different table types
233            async fn handle(
234                &mut self,
235                meta: QueryMetadata,
236                typed_query: Self::QueryKinds,
237            ) -> Option<(SocketAddr, Self::ResponseKinds)> {
238                // match on the right query and execute it
239                match typed_query {
240                    BasicQueryKinds::KeyValue(query) => {
241                        // handle these queries
242                        match self.key_value.handle(meta, query).await {
243                            Some((addr, response)) => {
244                                // wrap our response with the right table kind
245                                let wrapped = BasicResponseKinds::KeyValue(response);
246                                Some((addr, wrapped))
247                            }
248                            None => None,
249                        }
250                    }
251                }
252            }
253
254            /// Flush any in flight writes to disk
255            async fn flush(&mut self) -> Result<(), ServerError> {
256                self.key_value.flush().await
257            }
258
259            /// Get all flushed messages and send their response back
260            ///
261            /// # Arguments
262            ///
263            /// * `flushed` - The flushed response to send back
264            fn handle_flushed(&mut self, flushed: &mut Vec<(SocketAddr, Self::ResponseKinds)>) {
265                // get all flushed queries in their specific format
266                let specific = self.key_value.get_flushed();
267                // wrap and add our specific queries
268                let wrapped = specific
269                    .drain(..)
270                    .map(|(addr, resp)| (addr, BasicResponseKinds::KeyValue(resp)));
271                // extend our response list with our wrapped queries
272                flushed.extend(wrapped);
273            }
274
275            /// Shutdown this table and flush any data to disk if needed
276            async fn shutdown(&mut self) -> Result<(), ServerError> {
277                // shutdown the key value table
278                self.key_value.shutdown().await
279            }
280        }
281
282    });
283}
284
285/// Derive the basic traits and functions for a type to be a table in shoal
286#[proc_macro_derive(ShoalDB)]
287pub fn derive_shoal_db(stream: TokenStream) -> TokenStream {
288    // parse our target struct
289    let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
290    // get the name of our struct
291    let name = &ast.ident;
292    // we only support structs right now
293    let struct_data = match &ast.data {
294        Data::Struct(struct_data) => struct_data,
295        _ => unimplemented!("Only structs are currently supported"),
296    };
297    let fields = &struct_data.fields;
298    // start with an empty stream
299    let mut output = quote! {};
300    // add our shoal db trait
301    add_db_trait(name, fields, &mut output);
302    // convert and return our stream
303    output.into()
304}
305
306///// The arguments for a ShoalDB derive
307//#[derive(Debug, darling::FromAttributes)]
308//#[darling(attributes(shoal_db))]
309//struct ShoalDB {
310//    /// The name of this database
311//    pub name: String,
312//}
313//
314///// Extend a token stream with a ShoalDatabase implementation
315/////
316///// # Arguments
317/////
318///// * `stream` - The stream to extend
319///// * `name` - The name of the type we are extending
320///// * `table_name` - The name of the table
321///// * `response_name` - The name of the response type
322//fn add_shoal_database(
323//    stream: &mut proc_macro2::TokenStream,
324//    name: &Ident,
325//    query_name: &Ident,
326//    response_name: &Ident,
327//    archived_response_name: &Ident,
328//    tables: &Vec<Ident>,
329//) {
330//    stream.extend(quote! {
331//        #[automatically_derived]
332//        impl ShoalDatabase for #name {
333//            /// The different tables or types of queries we will handle
334//            type QueryKinds = #query_name;
335//
336//            /// The different tables we can get responses from
337//            type ResponseKinds = #response_name;
338//
339//            /// Deserialize our query types
340//            fn unarchive(buff: &[u8]) -> Queries<Self> {
341//                // try to cast this query
342//                let query = shoal_core::rkyv::check_archived_root::<Queries<Self>>(buff).unwrap();
343//                // deserialize it
344//                query.deserialize(&mut rkyv::Infallible).unwrap()
345//            }
346//
347//            // Deserialize our response types
348//            fn unarchive_response(buff: &[u8]) -> Self::ResponseKinds {
349//                // try to cast this query
350//                let query = shoal_core::rkyv::check_archived_root::<Self::ResponseKinds>(buff).unwrap();
351//                // deserialize it
352//                query.deserialize(&mut rkyv::Infallible).unwrap()
353//            }
354//
355//            // Deserialize our response types
356//            fn response_query_id(buff: &[u8]) -> &Uuid {
357//                // try to cast this query
358//                let kinds = shoal_core::rkyv::check_archived_root::<Self::ResponseKinds>(buff).unwrap();
359//                // pop our table kinds
360//                shoal_core::build_response_query_id_match!(kinds, #archived_response_name, KeyValueRow)
361//            }
362//
363//            /// Get the index of a single [`Self::ResponseKinds`]
364//            ///
365//            /// # Arguments
366//            ///
367//            /// * `resp` - The resp to to get the order index for
368//            fn response_index(resp: &Self::ResponseKinds) -> usize {
369//                shoal_core::build_response_index_match!(resp, #response_name, KeyValueRow)
370//            }
371//
372//            /// Get whether this is the last response in a response stream
373//            ///
374//            /// # Arguments
375//            ///
376//            /// * `resp` - The response to check
377//            fn is_end_of_stream(resp: &Self::ResponseKinds) -> bool {
378//                shoal_core::build_response_end_match!(resp, #response_name, KeyValueRow)
379//            }
380//
381//            /// Forward our queries to the correct shards
382//            async fn send_to_shard(
383//                ring: &Ring,
384//                mesh_tx: &mut Senders<MeshMsg<Self>>,
385//                addr: SocketAddr,
386//                queries: Queries<Self>,
387//            ) -> Result<(), ServerError> {
388//                let mut tmp = Vec::with_capacity(1);
389//                // get the index for the last query in this bundle
390//                let end_index = queries.queries.len() - 1;
391//                // crawl over our queries
392//                for (index, kind) in queries.queries.into_iter().enumerate() {
393//                    // get our target shards info
394//                    match &kind {
395//                        #query_name::KeyValueQuery(query) => {
396//                            // get our shards info
397//                            query.find_shard(ring, &mut tmp);
398//                        }
399//                    };
400//                    // send this query to the right shards
401//                    for shard_info in tmp.drain(..) {
402//                        match &shard_info.contact {
403//                            ShardContact::Local(id) => {
404//                                //println!("coord - sending query to shard: {id}");
405//                                mesh_tx
406//                                    .send_to(
407//                                        *id,
408//                                        MeshMsg::Query {
409//                                            addr,
410//                                            id: queries.id,
411//                                            index,
412//                                            query: kind.clone(),
413//                                            end: index == end_index,
414//                                        },
415//                                    )
416//                                    .await
417//                                    .unwrap();
418//                            }
419//                        };
420//                    }
421//                }
422//                Ok(())
423//            }
424//
425//            /// Handle messages for different table types
426//            async fn handle(
427//                &mut self,
428//                id: Uuid,
429//                index: usize,
430//                typed_query: Self::QueryKinds,
431//                end: bool,
432//            ) -> Self::ResponseKinds {
433//                // match on the right query and execute it
434//                match typed_query {
435//                    #query_name::KeyValueQuery(query) => {
436//                        // handle these queries
437//                        #response_name::KeyValueRow(self.key_value.handle(id, index, query, end))
438//                    }
439//                }
440//            }
441//        }
442//    });
443//}
444
445//#[proc_macro_derive(ShoalDB, attributes(shoal_db))]
446//pub fn derive_shoal_database(stream: TokenStream) -> TokenStream {
447//    // parse our target struct
448//    let ast = syn::parse_macro_input!(stream as syn::DeriveInput);
449//    // get the name of our struct
450//    let name = &ast.ident;
451//    let attrs = ShoalDB::from_attributes(&ast.attrs).expect("Failed to parse ShoalDB attributes");
452//    // build the names for our different derived types
453//    let query_name = syn::Ident::new(&format!("{}QueryKinds", attrs.name), name.span());
454//    let response_name = syn::Ident::new(&format!("{}ResponseKinds", attrs.name), name.span());
455//    let archived_response_name =
456//        syn::Ident::new(&format!("Archived{}ResponseKinds", attrs.name), name.span());
457//    // we only support structs right now
458//    let fields = match &ast.data {
459//        Data::Struct(DataStruct { fields, .. }) => fields,
460//        _ => unimplemented!("Only structs are currently supported"),
461//    };
462//    // start with an empty stream
463//    let mut output = quote! {};
464//    // init a list of the right size for the tables in our database
465//    let mut tables: Vec<Ident> = Vec::with_capacity(fields.len());
466//    // build a list of our field identies
467//    for field in fields {
468//        if let syn::Type::Path(path) = &field.ty {
469//            if let Some(segment) = path.path.segments.first() {
470//                if let PathArguments::AngleBracketed(angled) = &segment.arguments {
471//                    if let Some(arg) = angled.args.first() {
472//                        if let GenericArgument::Type(type_arg) = arg {
473//                            if let syn::Type::Path(path) = type_arg {
474//                                if let Some(segment) = path.path.segments.first() {
475//                                    // add our field identity
476//                                    tables.push(segment.ident.clone());
477//                                }
478//                            }
479//                        }
480//                    }
481//                }
482//            }
483//        }
484//    }
485//    // add a ShoalDatabase implementation
486//    add_shoal_database(
487//        &mut output,
488//        name,
489//        &query_name,
490//        &response_name,
491//        &archived_response_name,
492//        &tables,
493//    );
494//    // convert and return our stream
495//    output.into()
496//}