1use proc_macro::TokenStream;
5use proc_macro2::TokenStream as TokenStream2;
6use quote::{format_ident, quote};
7use syn::{
8 parse_macro_input, punctuated::Punctuated, token::Comma,
9 Data, DeriveInput, Field, Fields, Ident,
10};
11
12#[proc_macro_derive(
43 Store,
44 attributes(
45 store,
46 key,
47 index,
48 unique_index,
49 migrate_from,
50 model_name
51 )
52)]
53pub fn derive_store(input: TokenStream) -> TokenStream {
54 let input = parse_macro_input!(input as DeriveInput);
55 let name = &input.ident;
56
57 let prev_type = input
58 .attrs
59 .iter()
60 .find(|attr| attr.path().is_ident("migrate_from"))
61 .map(|attr| attr.parse_args::<syn::Path>())
62 .transpose()
63 .unwrap_or(None);
64
65 let fields = match &input.data {
66 Data::Struct(data) => match &data.fields {
67 Fields::Named(fields) => &fields.named,
68 _ => panic!("Only named fields are supported"),
69 },
70 _ => panic!("Only structs are supported"),
71 };
72 let key_field = fields
73 .iter()
74 .find(|f| {
75 f.attrs.iter().any(|a| a.path().is_ident("key"))
76 })
77 .expect("A field with #[key] attribute is required");
78
79 let load_method = generate_load_method(fields);
80 let save_method =
81 generate_save_method(name, fields, prev_type.as_ref());
82 let delete_method =
83 generate_delete_method(name, fields, prev_type.as_ref());
84 let index_methods = generate_index_methods(name, fields);
85 let set_methods =
86 generate_set_methods(name, fields, prev_type.as_ref());
87 let all_method = generate_all_method(key_field);
88 let migration_trait = prev_type
89 .as_ref()
90 .map(|prev| generate_migration_trait(name, prev));
91 let ensure_migrations = prev_type
92 .as_ref()
93 .map_or(
94 quote! {
95 pub async fn ensure_migrations(_client: &::tikv_client::TransactionClient) -> Result<(), ::tikv_client::Error> {
96 Ok(())
97 }
98 },
99 |prev| generate_ensure_migrations(name, prev)
100 );
101 let backup_restore = generate_backup_restore_methods();
102
103 quote! {
106 #migration_trait
107
108 impl #name {
109 const MODEL_NAME: &'static str = stringify!(#name);
110
111 #load_method
112 #save_method
113 #delete_method
114 #ensure_migrations
115 #all_method
116 #backup_restore
117 #(#index_methods)*
118 #(#set_methods)*
119 }
120 }
121 .into()
122}
123
124fn generate_load_method(
125 fields: &Punctuated<Field, Comma>,
126) -> TokenStream2 {
127 let key_field = fields
128 .iter()
129 .find(|f| {
130 f.attrs.iter().any(|a| a.path().is_ident("key"))
131 })
132 .expect("A field with #[key] attribute is required");
133 let key_type = &key_field.ty;
134
135 let field_loads = fields.iter().map(|f| {
136 let field_name = &f.ident;
137 let field_type = &f.ty;
138 quote! {
139 let #field_name: #field_type = {
140 let key = format!(
141 "ergokv:{}:{}:{}",
142 Self::MODEL_NAME,
143 ::ergokv::serde_json::to_string(&key)
144 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {e}")))?,
145 stringify!(#field_name)
146 );
147 let value = txn.get(key.clone()).await?
148 .ok_or_else(|| tikv_client::Error::StringError(key.clone()))?;
149 ::ergokv::ciborium::de::from_reader(value.as_slice())
150 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode {}: {}", stringify!(#field_name), e)))?
151 };
152 }
153 });
154
155 let struct_init = fields.iter().map(|f| {
156 let field_name = &f.ident;
157 quote! { #field_name: #field_name }
158 });
159
160 quote! {
161 pub async fn load(key: &#key_type, txn: &mut tikv_client::Transaction) -> Result<Self, tikv_client::Error> {
162 #(#field_loads)*
163 Ok(Self {
164 #(#struct_init,)*
165 })
166 }
167 }
168}
169
170fn generate_save_method(
171 name: &Ident,
172 fields: &Punctuated<Field, Comma>,
173 prev_type: Option<&syn::Path>,
174) -> TokenStream2 {
175 let key_field = fields
176 .iter()
177 .find(|f| {
178 f.attrs.iter().any(|a| a.path().is_ident("key"))
179 })
180 .expect("A field with #[key] attribute is required");
181 let key_ident = &key_field.ident;
182 let checks = generate_mutation_checks(name, prev_type);
183
184 let field_saves = fields.iter().map(|f| {
185 let field_name = &f.ident;
186 quote! {
187 let key = format!(
188 "ergokv:{}:{}:{}",
189 Self::MODEL_NAME,
190 ::ergokv::serde_json::to_string(&self.#key_ident)
191 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {}", e)))?,
192 stringify!(#field_name)
193 );
194 let mut value = Vec::new();
195 ::ergokv::ciborium::ser::into_writer(&self.#field_name, &mut value)
196 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode {}: {}", stringify!(#field_name), e)))?;
197 txn.put(key, value).await?;
198 }
199 });
200
201 let index_saves = fields.iter()
202 .filter(|f| f.attrs.iter().any(|a| a.path().is_ident("unique_index") || a.path().is_ident("index")))
203 .map(|f| {
204 let field_name = &f.ident;
205 let is_unique = f.attrs.iter().any(|a| a.path().is_ident("unique_index"));
206
207 if is_unique {
208 quote! {
209 let index_key = format!(
210 "ergokv:{}:unique_index:{}:{}",
211 Self::MODEL_NAME,
212 stringify!(#field_name),
213 ::ergokv::serde_json::to_string(&self.#field_name)
214 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
215 );
216 let mut value = Vec::new();
217 ::ergokv::ciborium::ser::into_writer(&self.#key_ident, &mut value)
218 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode {}: {}", stringify!(#field_name), e)))?;
219 txn.put(index_key, value).await?;
220 }
221 } else {
222 quote! {
223 let index_key = format!(
224 "ergokv:{}:index:{}:{}",
225 Self::MODEL_NAME,
226 stringify!(#field_name),
227 ::ergokv::serde_json::to_string(&self.#field_name)
228 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
229 );
230
231 let mut keys = if let Some(existing_keys_bytes) = txn.get(index_key.clone()).await? {
233 ::ergokv::ciborium::de::from_reader(existing_keys_bytes.as_slice())
234 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode keys: {}", e)))?
235 } else {
236 Vec::new()
237 };
238
239 if !keys.contains(&self.#key_ident) {
241 keys.push(self.#key_ident);
242 }
243
244 let mut value = Vec::new();
246 ::ergokv::ciborium::ser::into_writer(&keys, &mut value)
247 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode keys: {}", e)))?;
248 txn.put(index_key, value).await?;
249 }
250 }
251 });
252
253 quote! {
254 pub async fn save(&self, txn: &mut tikv_client::Transaction) -> Result<(), tikv_client::Error> {
255 #checks
256
257 let trie = ::ergokv::PrefixTrie::new("ergokv:__trie");
259 trie.insert(
260 txn,
261 &format!(
262 "{}:{}",
263 Self::MODEL_NAME,
264 ::ergokv::serde_json::to_string(&self.#key_ident)
265 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {}", e)))?
266 )
267 ).await?;
268
269 #(#field_saves)*
270 #(#index_saves)*
271 Ok(())
272 }
273 }
274}
275
276fn generate_delete_method(
277 name: &Ident,
278 fields: &Punctuated<Field, Comma>,
279 prev_type: Option<&syn::Path>,
280) -> TokenStream2 {
281 let key_field = fields
282 .iter()
283 .find(|f| {
284 f.attrs.iter().any(|a| a.path().is_ident("key"))
285 })
286 .expect("A field with #[key] attribute is required");
287 let key_ident = &key_field.ident;
288 let key_type = &key_field.ty;
289 let checks = generate_mutation_checks(name, prev_type);
290
291 let field_deletes = fields.iter().map(|f| {
292 let field_name = &f.ident;
293 quote! {
294 let key = format!(
295 "ergokv:{}:{}:{}",
296 Self::MODEL_NAME,
297 ::ergokv::serde_json::to_string(&self.#key_ident)
298 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {}", e)))?,
299 stringify!(#field_name)
300 );
301 txn.delete(key).await?;
302 }
303 });
304
305 let index_deletes = fields.iter()
306 .filter(|f| f.attrs.iter().any(|a| a.path().is_ident("unique_index") || a.path().is_ident("index")))
307 .map(|f| {
308 let field_name = &f.ident;
309 let is_unique = f.attrs.iter().any(|a| a.path().is_ident("unique_index"));
310
311 if is_unique {
312 quote! {
313 let index_key = format!(
314 "ergokv:{}:unique_index:{}:{}",
315 Self::MODEL_NAME,
316 stringify!(#field_name),
317 ::ergokv::serde_json::to_string(&self.#field_name)
318 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
319 );
320 txn.delete(index_key).await?;
321 }
322 } else {
323 quote! {
324 let index_key = format!(
325 "ergokv:{}:index:{}:{}",
326 Self::MODEL_NAME,
327 stringify!(#field_name),
328 ::ergokv::serde_json::to_string(&self.#field_name)
329 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
330 );
331
332 if let Some(existing_keys_bytes) = txn.get(index_key.clone()).await? {
334 let mut keys: Vec<#key_type> = ::ergokv::ciborium::de::from_reader(existing_keys_bytes.as_slice())
335 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode keys: {}", e)))?;
336
337 keys.retain(|k| k != &self.#key_ident);
339
340 if keys.is_empty() {
342 txn.delete(index_key).await?;
343 } else {
344 let mut value = Vec::new();
346 ::ergokv::ciborium::ser::into_writer(&keys, &mut value)
347 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode keys: {}", e)))?;
348 txn.put(index_key, value).await?;
349 }
350 }
351 }
352 }
353 });
354
355 quote! {
356 pub async fn delete(&self, txn: &mut tikv_client::Transaction) -> Result<(), tikv_client::Error> {
357 #checks
358
359 let trie = ::ergokv::PrefixTrie::new("ergokv:__trie");
361 trie.remove(txn, &format!(
362 "{}:{}",
363 Self::MODEL_NAME,
364 ::ergokv::serde_json::to_string(&self.#key_ident)
365 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {}", e)))?,
366 )).await?;
367
368 #(#field_deletes)*
369 #(#index_deletes)*
370 Ok(())
371 }
372 }
373}
374
375fn generate_index_methods(
376 name: &Ident,
377 fields: &Punctuated<Field, Comma>,
378) -> Vec<TokenStream2> {
379 let key_field = fields
380 .iter()
381 .find(|f| {
382 f.attrs.iter().any(|a| a.path().is_ident("key"))
383 })
384 .expect("A field with #[key] attribute is required");
385 let key_type = &key_field.ty;
386
387 fields.iter()
388 .filter(|f| f.attrs.iter().any(|a| a.path().is_ident("unique_index") || a.path().is_ident("index")))
389 .map(|f| {
390 let field_name = &f.ident;
391 let field_type = &f.ty;
392 let method_name = format_ident!("by_{}", field_name.clone().expect("Missing field name"));
393 let is_unique = f.attrs.iter().any(|a| a.path().is_ident("unique_index"));
394
395 if is_unique {
396 quote! {
397 #[doc = concat!("Find a ", stringify!(#name), " by its ", stringify!(#field_name), " field.")]
398 #[doc = ""]
399 #[doc = concat!("This method uses the unique index on the ", stringify!(#field_name), " field to efficiently retrieve the object.")]
400 pub async fn #method_name<T: Into<#field_type>>(value: T, client: &mut tikv_client::Transaction) -> Result<Option<Self>, tikv_client::Error> {
401 let index_key = format!(
402 "ergokv:{}:unique_index:{}:{}",
403 Self::MODEL_NAME,
404 stringify!(#field_name),
405 ::ergokv::serde_json::to_string(&value.into())
406 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct value: {e}")))?
407 );
408 if let Some(key_bytes) = client.get(index_key).await? {
409 let key = ::ergokv::ciborium::de::from_reader(key_bytes.as_slice())
410 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode key: {}", e)))?;
411
412 Self::load(&key, client).await.map(Some)
413 } else {
414 Ok(None)
415 }
416 }
417 }
418 } else {
419 quote! {
420 #[doc = concat!("Find all ", stringify!(#name), " by its ", stringify!(#field_name), " field.")]
421 #[doc = ""]
422 #[doc = concat!("This method uses the index on the ", stringify!(#field_name), " field to efficiently retrieve multiple objects.")]
423 pub async fn #method_name<T: Into<#field_type>>(value: T, client: &mut tikv_client::Transaction) -> Result<Vec<Self>, tikv_client::Error> {
424 let index_key = format!(
425 "ergokv:{}:index:{}:{}",
426 Self::MODEL_NAME,
427 stringify!(#field_name),
428 ::ergokv::serde_json::to_string(&value.into())
429 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct value: {e}")))?
430 );
431 if let Some(keys_bytes) = client.get(index_key).await? {
432 let keys: Vec<#key_type> = ::ergokv::ciborium::de::from_reader(keys_bytes.as_slice())
433 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode keys: {}", e)))?;
434
435 let mut results = Vec::new();
436 for key in keys {
437 results.push(Self::load(&key, client).await?);
438 }
439 Ok(results)
440 } else {
441 Ok(Vec::new())
442 }
443 }
444 }
445 }
446 })
447 .collect()
448}
449
450fn generate_set_methods(
451 name: &Ident,
452 fields: &Punctuated<Field, Comma>,
453 prev_type: Option<&syn::Path>,
454) -> Vec<TokenStream2> {
455 fields.iter().map(|f| {
456 let field_name = &f.ident;
457 let field_type = &f.ty;
458 let method_name = format_ident!("set_{}", field_name.clone().expect("Missing field name"));
459 let is_indexed = f.attrs.iter().any(|a| a.path().is_ident("index"));
460 let key_field = fields.iter().find(|f| f.attrs.iter().any(|a| a.path().is_ident("key")))
461 .expect("A field with #[key] attribute is required");
462 let key_ident = &key_field.ident;
463 let checks = generate_mutation_checks(name, prev_type);
464
465 let index_ops = if is_indexed {
466 quote! {
467 let old_index_key = format!(
469 "ergokv:{}:{}:{}",
470 Self::MODEL_NAME,
471 stringify!(#field_name),
472 ::ergokv::serde_json::to_string(&self.#field_name)
473 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
474 );
475 txn.delete(old_index_key).await?;
476
477 let new_index_key = format!(
479 "ergokv:{}:{}:{}",
480 Self::MODEL_NAME,
481 stringify!(#field_name),
482 ::ergokv::serde_json::to_string(&self.#field_name)
483 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct field: {}", e)))?,
484 );
485 let mut value = Vec::new();
486 ::ergokv::ciborium::ser::into_writer(&self.#key_ident, &mut value)
487 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode key: {}", e)))?;
488 txn.put(new_index_key, value).await?;
489 }
490 } else {
491 quote! {}
492 };
493
494 quote! {
495 pub async fn #method_name(&mut self, new_value: #field_type, txn: &mut tikv_client::Transaction) -> Result<(), tikv_client::Error> {
496 #checks
497 #index_ops
498
499 self.#field_name = new_value;
501
502 let key = format!(
504 "ergokv:{}:{}:{}",
505 Self::MODEL_NAME,
506 ::ergokv::serde_json::to_string(&self.#key_ident)
507 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode struct key: {}", e)))?,
508 stringify!(#field_name)
509 );
510 let mut value = Vec::new();
511 ::ergokv::ciborium::ser::into_writer(&self.#field_name, &mut value)
512 .map_err(|e| tikv_client::Error::StringError(format!("Failed to encode {}: {}", stringify!(#field_name), e)))?;
513 txn.put(key, value).await?;
514
515 Ok(())
516 }
517 }
518 }).collect()
519}
520
521fn generate_all_method(key_field: &Field) -> TokenStream2 {
522 let key_type = &key_field.ty;
523
524 quote! {
525 pub fn all(txn: &mut tikv_client::Transaction) -> impl futures::Stream<Item = Result<Self, tikv_client::Error>> + '_ {
526 use futures::StreamExt;
527 let trie = ::ergokv::PrefixTrie::new("ergokv:__trie");
528
529 async_stream::try_stream! {
530 let keys = trie.find_by_prefix(txn, Self::MODEL_NAME).await?;
531 for key in keys {
532 if let Some(stripped) = key.strip_prefix(&format!("{}:", Self::MODEL_NAME)) {
533 let key: #key_type = ::ergokv::serde_json::from_str(stripped)
534 .map_err(|e| tikv_client::Error::StringError(format!("Failed to decode key: {}", e)))?;
535 yield Self::load(&key, txn).await?;
536 }
537 }
538 }
539 }
540 }
541}
542
543fn generate_migration_trait(
544 name: &Ident,
545 prev_type: &syn::Path,
546) -> TokenStream2 {
547 let trait_name = format_ident!(
549 "{}To{}",
550 prev_type.segments.last().unwrap().ident,
551 name
552 );
553
554 let method_name = format_ident!(
556 "from_{}",
557 prev_type
558 .segments
559 .last()
560 .unwrap()
561 .ident
562 .to_string()
563 .to_lowercase()
564 );
565
566 quote! {
567 pub trait #trait_name {
568 fn #method_name(prev: &#prev_type) -> Result<Self, ::tikv_client::Error>
569 where Self: Sized;
570 }
571 }
572}
573
574fn generate_ensure_migrations(
575 name: &Ident,
576 prev_type: &syn::Path,
577) -> TokenStream2 {
578 let migration_name = format!(
579 "{}->{}",
580 prev_type.segments.last().unwrap().ident,
581 name
582 );
583 let method_name = format_ident!(
584 "from_{}",
585 prev_type
586 .segments
587 .last()
588 .unwrap()
589 .ident
590 .to_string()
591 .to_lowercase()
592 );
593
594 quote! {
595 pub async fn ensure_migrations(client: &::tikv_client::TransactionClient) -> Result<(), ::tikv_client::Error> {
596 let migrations_key = format!("{}:__migrations", Self::MODEL_NAME);
597 let mut txn = client.begin_optimistic().await?;
598
599 let migrations: Vec<String> = if let Some(data) = txn.get(migrations_key.as_bytes().to_vec()).await? {
600 ::ergokv::ciborium::de::from_reader(&data[..])
601 .map_err(|e| ::tikv_client::Error::StringError(format!("{e}")))?
602 } else {
603 Vec::new()
604 };
605
606 txn.commit().await?;
607
608 if !migrations.contains(&#migration_name.to_string()) {
609 #prev_type::ensure_migrations(&client).await?;
610
611 let mut txn = client.begin_optimistic().await?;
612 let mut stream = Box::pin(#prev_type::all(&mut txn));
613
614 {
616 use ::ergokv::futures::StreamExt;
617 let mut stream = stream;
618 while let Some(Ok(prev_item)) = stream.next().await {
619 let mut new_txn = client.begin_optimistic().await?;
620
621 match Self::#method_name(&prev_item) {
622 Ok(new) => {
623 new.save(&mut new_txn).await?;
624 new_txn.commit().await?;
625 }
626 e @ Err(_) => {
627 new_txn.rollback().await?;
628 e?;
629 }
630 };
631 }
632 }
633
634 let mut new_migrations = migrations;
635 new_migrations.push(#migration_name.to_string());
636
637 let mut buf = vec![];
638 ::ergokv::ciborium::ser::into_writer(&new_migrations, &mut buf)
639 .map_err(|e| ::tikv_client::Error::StringError(format!("{e}")))?;
640
641 txn.put(migrations_key.as_bytes().to_vec(), buf).await?;
642
643 txn.commit().await?;
644 }
645
646 Ok(())
647 }
648 }
649}
650
651fn generate_mutation_checks(
652 name: &Ident,
653 prev_type: Option<&syn::Path>,
654) -> TokenStream2 {
655 #[cfg(feature = "strict-migrations")]
656 {
657 let prev_check = prev_type.map(|prev| {
658 quote! {
659 if !migrations.contains(&format!("{}->{}",
660 stringify!(#prev),
661 stringify!(#name)
662 )) {
663 return Err(::tikv_client::Error::StringError(
664 format!("Previous migration {}=>{} not applied", stringify!(#prev), stringify!(#name))
665 ));
666 }
667 }
668 });
669
670 quote! {
671 let migrations_key = format!("{}:__migrations", Self::MODEL_NAME);
672 let migrations: Vec<String> = if let Some(data) = txn.get(&migrations_key).await? {
673 ::ergokv::ciborium::de::from_reader(&data[..])?
674 } else {
675 Vec::new()
676 };
677
678 if migrations.iter().any(|m| m.starts_with(&format!("{}->", stringify!(#name)))) {
679 return Err(::tikv_client::Error::StringError(
680 format!("Cannot modify {} - newer version exists", stringify!(#name))
681 ));
682 }
683
684 #prev_check
685 }
686 }
687
688 #[cfg(not(feature = "strict-migrations"))]
689 {
690 let _unused_prev = prev_type;
692 let _unused_name = name;
693 quote! {}
694 }
695}
696
697fn generate_backup_restore_methods() -> TokenStream2 {
699 quote! {
700 pub async fn backup(txn: &mut tikv_client::Transaction, path: impl AsRef<std::path::Path>) -> Result<std::path::PathBuf, tikv_client::Error> {
739 use std::io::Write;
740 use futures::StreamExt;
741
742 let timestamp = std::time::SystemTime::now()
743 .duration_since(std::time::UNIX_EPOCH)
744 .map_err(|e| tikv_client::Error::StringError(e.to_string()))?
745 .as_secs();
746
747 let filename = format!("{}_{}.json", Self::MODEL_NAME, timestamp);
748 let backup_path = path.as_ref().join(filename);
749
750 let mut file = std::fs::File::create(&backup_path)
751 .map_err(|e| tikv_client::Error::StringError(format!("Failed to create backup file: {}", e)))?;
752
753 let mut stream = Box::pin(Self::all(txn));
754 while let Some(item) = stream.next().await {
755 let item = item?;
756 let json = serde_json::to_string(&item)
757 .map_err(|e| tikv_client::Error::StringError(format!("Failed to serialize: {}", e)))?;
758 writeln!(file, "{}", json)
759 .map_err(|e| tikv_client::Error::StringError(format!("Failed to write: {}", e)))?;
760 }
761
762 Ok(backup_path)
763 }
764
765 pub async fn restore(txn: &mut tikv_client::Transaction, path: impl AsRef<std::path::Path>) -> Result<(), tikv_client::Error> {
805 use std::io::BufRead;
806
807 let file = std::fs::File::open(path)
808 .map_err(|e| tikv_client::Error::StringError(format!("Failed to open backup file: {}", e)))?;
809
810 let reader = std::io::BufReader::new(file);
811 for line in reader.lines() {
812 let line = line
813 .map_err(|e| tikv_client::Error::StringError(format!("Failed to read line: {}", e)))?;
814
815 let item: Self = serde_json::from_str(&line)
816 .map_err(|e| tikv_client::Error::StringError(format!("Failed to deserialize: {}", e)))?;
817
818 item.save(txn).await?;
819 }
820
821 Ok(())
822 }
823 }
824}