1use std::collections::{HashMap, HashSet};
16
17use redis::Value;
18use tokio::runtime::Runtime;
19
20use crate::connection::RedisConnection;
21use crate::error::{Error, Result};
22
23const DEFAULT_WRITE_BATCH_SIZE: usize = 1000;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum WriteMode {
29 Fail,
31 #[default]
33 Replace,
34 Append,
36}
37
38impl std::str::FromStr for WriteMode {
39 type Err = Error;
40
41 fn from_str(s: &str) -> Result<Self> {
42 match s.to_lowercase().as_str() {
43 "fail" => Ok(WriteMode::Fail),
44 "replace" => Ok(WriteMode::Replace),
45 "append" => Ok(WriteMode::Append),
46 _ => Err(Error::InvalidInput(format!(
47 "Invalid write mode '{}'. Expected: fail, replace, or append",
48 s
49 ))),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct WriteResult {
57 pub keys_written: usize,
59 pub keys_failed: usize,
61 pub keys_skipped: usize,
63}
64
65#[derive(Debug, Clone)]
67pub struct KeyError {
68 pub key: String,
70 pub error: String,
72}
73
74#[derive(Debug, Clone)]
95pub struct WriteResultDetailed {
96 pub keys_written: usize,
98 pub keys_failed: usize,
100 pub keys_skipped: usize,
102 pub succeeded_keys: Vec<String>,
104 pub errors: Vec<KeyError>,
106}
107
108impl WriteResultDetailed {
109 pub fn new() -> Self {
111 Self {
112 keys_written: 0,
113 keys_failed: 0,
114 keys_skipped: 0,
115 succeeded_keys: Vec::new(),
116 errors: Vec::new(),
117 }
118 }
119
120 pub fn failed_keys(&self) -> Vec<&str> {
122 self.errors.iter().map(|e| e.key.as_str()).collect()
123 }
124
125 pub fn error_map(&self) -> HashMap<&str, &str> {
127 self.errors
128 .iter()
129 .map(|e| (e.key.as_str(), e.error.as_str()))
130 .collect()
131 }
132
133 pub fn is_complete_success(&self) -> bool {
135 self.keys_failed == 0
136 }
137
138 pub fn to_basic(&self) -> WriteResult {
140 WriteResult {
141 keys_written: self.keys_written,
142 keys_failed: self.keys_failed,
143 keys_skipped: self.keys_skipped,
144 }
145 }
146}
147
148impl Default for WriteResultDetailed {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154pub fn write_hashes(
167 url: &str,
168 keys: Vec<String>,
169 fields: Vec<String>,
170 values: Vec<Vec<Option<String>>>,
171 ttl: Option<i64>,
172 if_exists: WriteMode,
173) -> Result<WriteResult> {
174 let runtime =
175 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
176
177 let connection = RedisConnection::new(url)?;
178
179 runtime.block_on(async {
180 let mut conn = connection.get_async_connection().await?;
181 write_hashes_async(&mut conn, keys, fields, values, ttl, if_exists).await
182 })
183}
184
185async fn write_hashes_async(
187 conn: &mut redis::aio::MultiplexedConnection,
188 keys: Vec<String>,
189 fields: Vec<String>,
190 values: Vec<Vec<Option<String>>>,
191 ttl: Option<i64>,
192 if_exists: WriteMode,
193) -> Result<WriteResult> {
194 let mut keys_written = 0;
195 let mut keys_failed = 0;
196 let mut keys_skipped = 0;
197
198 for batch_start in (0..keys.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
200 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(keys.len());
201 let batch_keys = &keys[batch_start..batch_end];
202
203 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
205 let mut pipe = redis::pipe();
206 for key in batch_keys {
207 pipe.exists(key);
208 }
209 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
210 exists_results
211 .into_iter()
212 .enumerate()
213 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
214 .collect()
215 } else {
216 HashSet::new()
217 };
218
219 if if_exists == WriteMode::Replace {
221 let mut del_pipe = redis::pipe();
222 for key in batch_keys {
223 del_pipe.del(key).ignore();
224 }
225 let _ = del_pipe.query_async::<()>(conn).await;
226 }
227
228 let mut pipe = redis::pipe();
230 let mut batch_indices: Vec<usize> = Vec::new();
231
232 for (batch_idx, key) in batch_keys.iter().enumerate() {
233 let global_idx = batch_start + batch_idx;
234
235 if existing_keys.contains(&batch_idx) {
237 keys_skipped += 1;
238 continue;
239 }
240
241 if global_idx >= values.len() {
242 break;
243 }
244
245 let row = &values[global_idx];
246 let mut hash_data: Vec<(&str, &str)> = Vec::new();
247
248 for (j, field) in fields.iter().enumerate() {
249 if j < row.len()
250 && let Some(value) = &row[j]
251 {
252 hash_data.push((field.as_str(), value.as_str()));
253 }
254 }
255
256 if !hash_data.is_empty() {
257 pipe.hset_multiple(key, &hash_data);
258 if let Some(seconds) = ttl {
259 pipe.expire(key, seconds);
260 }
261 batch_indices.push(batch_idx);
262 }
263 }
264
265 if !batch_indices.is_empty() {
267 match pipe.query_async::<()>(conn).await {
268 Ok(_) => keys_written += batch_indices.len(),
269 Err(_) => keys_failed += batch_indices.len(),
270 }
271 }
272 }
273
274 Ok(WriteResult {
275 keys_written,
276 keys_failed,
277 keys_skipped,
278 })
279}
280
281pub fn write_hashes_detailed(
317 url: &str,
318 keys: Vec<String>,
319 fields: Vec<String>,
320 values: Vec<Vec<Option<String>>>,
321 ttl: Option<i64>,
322 if_exists: WriteMode,
323) -> Result<WriteResultDetailed> {
324 let runtime =
325 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
326
327 let connection = RedisConnection::new(url)?;
328
329 runtime.block_on(async {
330 let mut conn = connection.get_async_connection().await?;
331 write_hashes_detailed_async(&mut conn, keys, fields, values, ttl, if_exists).await
332 })
333}
334
335async fn write_hashes_detailed_async(
337 conn: &mut redis::aio::MultiplexedConnection,
338 keys: Vec<String>,
339 fields: Vec<String>,
340 values: Vec<Vec<Option<String>>>,
341 ttl: Option<i64>,
342 if_exists: WriteMode,
343) -> Result<WriteResultDetailed> {
344 let mut result = WriteResultDetailed::new();
345 for batch_start in (0..keys.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
347 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(keys.len());
348 let batch_keys = &keys[batch_start..batch_end];
349
350 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
352 let mut pipe = redis::pipe();
353 for key in batch_keys {
354 pipe.exists(key);
355 }
356 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
357 exists_results
358 .into_iter()
359 .enumerate()
360 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
361 .collect()
362 } else {
363 HashSet::new()
364 };
365
366 if if_exists == WriteMode::Replace {
368 let mut del_pipe = redis::pipe();
369 for key in batch_keys {
370 del_pipe.del(key).ignore();
371 }
372 let _ = del_pipe.query_async::<()>(conn).await;
373 }
374
375 let mut pipe = redis::pipe();
377 let mut key_command_counts: Vec<(String, usize)> = Vec::new();
379
380 for (batch_idx, key) in batch_keys.iter().enumerate() {
381 let global_idx = batch_start + batch_idx;
382
383 if existing_keys.contains(&batch_idx) {
385 result.keys_skipped += 1;
386 continue;
387 }
388
389 if global_idx >= values.len() {
390 break;
391 }
392
393 let row = &values[global_idx];
394 let mut hash_data: Vec<(&str, &str)> = Vec::new();
395
396 for (j, field) in fields.iter().enumerate() {
397 if j < row.len()
398 && let Some(value) = &row[j]
399 {
400 hash_data.push((field.as_str(), value.as_str()));
401 }
402 }
403
404 if !hash_data.is_empty() {
405 pipe.hset_multiple(key, &hash_data);
406 let mut cmd_count = 1;
407 if let Some(seconds) = ttl {
408 pipe.expire(key, seconds);
409 cmd_count += 1;
410 }
411 key_command_counts.push((key.clone(), cmd_count));
412 }
413 }
414
415 if !key_command_counts.is_empty() {
417 match pipe.query_async::<Vec<Value>>(conn).await {
418 Ok(responses) => {
419 let mut response_idx = 0;
421 for (key, cmd_count) in &key_command_counts {
422 let mut key_succeeded = true;
423 let mut key_error = String::new();
424
425 for _ in 0..*cmd_count {
427 if response_idx < responses.len() {
428 if let Value::ServerError(err) = &responses[response_idx] {
429 key_succeeded = false;
430 key_error = err.to_string();
431 }
432 response_idx += 1;
433 }
434 }
435
436 if key_succeeded {
437 result.keys_written += 1;
438 result.succeeded_keys.push(key.clone());
439 } else {
440 result.keys_failed += 1;
441 result.errors.push(KeyError {
442 key: key.clone(),
443 error: key_error,
444 });
445 }
446 }
447 }
448 Err(e) => {
449 for (key, _) in key_command_counts {
451 result.keys_failed += 1;
452 result.errors.push(KeyError {
453 key,
454 error: e.to_string(),
455 });
456 }
457 }
458 }
459 }
460 }
461
462 Ok(result)
463}
464
465pub fn write_json(
477 url: &str,
478 keys: Vec<String>,
479 json_strings: Vec<String>,
480 ttl: Option<i64>,
481 if_exists: WriteMode,
482) -> Result<WriteResult> {
483 let runtime =
484 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
485
486 let connection = RedisConnection::new(url)?;
487
488 runtime.block_on(async {
489 let mut conn = connection.get_async_connection().await?;
490 write_json_async(&mut conn, keys, json_strings, ttl, if_exists).await
491 })
492}
493
494async fn write_json_async(
496 conn: &mut redis::aio::MultiplexedConnection,
497 keys: Vec<String>,
498 json_strings: Vec<String>,
499 ttl: Option<i64>,
500 if_exists: WriteMode,
501) -> Result<WriteResult> {
502 let mut keys_written = 0;
503 let mut keys_failed = 0;
504 let mut keys_skipped = 0;
505
506 let items: Vec<_> = keys.iter().zip(json_strings.iter()).collect();
507
508 for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
510 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
511 let batch_items = &items[batch_start..batch_end];
512
513 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
515 let mut pipe = redis::pipe();
516 for (key, _) in batch_items {
517 pipe.exists(*key);
518 }
519 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
520 exists_results
521 .into_iter()
522 .enumerate()
523 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
524 .collect()
525 } else {
526 HashSet::new()
527 };
528
529 let mut pipe = redis::pipe();
531 let mut batch_count = 0;
532
533 for (batch_idx, (key, json_str)) in batch_items.iter().enumerate() {
534 if existing_keys.contains(&batch_idx) {
536 keys_skipped += 1;
537 continue;
538 }
539
540 pipe.cmd("JSON.SET").arg(*key).arg("$").arg(*json_str);
541 if let Some(seconds) = ttl {
542 pipe.expire(*key, seconds);
543 }
544 batch_count += 1;
545 }
546
547 if batch_count > 0 {
549 match pipe.query_async::<()>(conn).await {
550 Ok(_) => keys_written += batch_count,
551 Err(_) => keys_failed += batch_count,
552 }
553 }
554 }
555
556 Ok(WriteResult {
557 keys_written,
558 keys_failed,
559 keys_skipped,
560 })
561}
562
563pub fn write_strings(
575 url: &str,
576 keys: Vec<String>,
577 values: Vec<Option<String>>,
578 ttl: Option<i64>,
579 if_exists: WriteMode,
580) -> Result<WriteResult> {
581 let runtime =
582 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
583
584 let connection = RedisConnection::new(url)?;
585
586 runtime.block_on(async {
587 let mut conn = connection.get_async_connection().await?;
588 write_strings_async(&mut conn, keys, values, ttl, if_exists).await
589 })
590}
591
592async fn write_strings_async(
594 conn: &mut redis::aio::MultiplexedConnection,
595 keys: Vec<String>,
596 values: Vec<Option<String>>,
597 ttl: Option<i64>,
598 if_exists: WriteMode,
599) -> Result<WriteResult> {
600 let mut keys_written = 0;
601 let mut keys_failed = 0;
602 let mut keys_skipped = 0;
603
604 let items: Vec<_> = keys
606 .iter()
607 .zip(values.iter())
608 .filter_map(|(k, v)| v.as_ref().map(|val| (k, val)))
609 .collect();
610
611 for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
613 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
614 let batch_items = &items[batch_start..batch_end];
615
616 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
618 let mut pipe = redis::pipe();
619 for (key, _) in batch_items {
620 pipe.exists(*key);
621 }
622 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
623 exists_results
624 .into_iter()
625 .enumerate()
626 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
627 .collect()
628 } else {
629 HashSet::new()
630 };
631
632 let mut pipe = redis::pipe();
634 let mut batch_count = 0;
635
636 for (batch_idx, (key, val)) in batch_items.iter().enumerate() {
637 if existing_keys.contains(&batch_idx) {
639 keys_skipped += 1;
640 continue;
641 }
642
643 if let Some(seconds) = ttl {
647 pipe.cmd("SETEX").arg(*key).arg(seconds).arg(*val);
649 } else {
650 pipe.set(*key, *val);
651 }
652 batch_count += 1;
653 }
654
655 if batch_count > 0 {
657 match pipe.query_async::<()>(conn).await {
658 Ok(_) => keys_written += batch_count,
659 Err(_) => keys_failed += batch_count,
660 }
661 }
662 }
663
664 Ok(WriteResult {
665 keys_written,
666 keys_failed,
667 keys_skipped,
668 })
669}
670
671pub fn write_lists(
683 url: &str,
684 keys: Vec<String>,
685 elements: Vec<Vec<String>>,
686 ttl: Option<i64>,
687 if_exists: WriteMode,
688) -> Result<WriteResult> {
689 let runtime =
690 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
691
692 let connection = RedisConnection::new(url)?;
693
694 runtime.block_on(async {
695 let mut conn = connection.get_async_connection().await?;
696 write_lists_async(&mut conn, keys, elements, ttl, if_exists).await
697 })
698}
699
700async fn write_lists_async(
702 conn: &mut redis::aio::MultiplexedConnection,
703 keys: Vec<String>,
704 elements: Vec<Vec<String>>,
705 ttl: Option<i64>,
706 if_exists: WriteMode,
707) -> Result<WriteResult> {
708 let mut keys_written = 0;
709 let mut keys_failed = 0;
710 let mut keys_skipped = 0;
711
712 let items: Vec<_> = keys.iter().zip(elements.iter()).collect();
713
714 for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
716 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
717 let batch_items = &items[batch_start..batch_end];
718
719 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
721 let mut pipe = redis::pipe();
722 for (key, _) in batch_items {
723 pipe.exists(*key);
724 }
725 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
726 exists_results
727 .into_iter()
728 .enumerate()
729 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
730 .collect()
731 } else {
732 HashSet::new()
733 };
734
735 if if_exists == WriteMode::Replace {
737 let mut del_pipe = redis::pipe();
738 for (key, _) in batch_items {
739 del_pipe.del(*key).ignore();
740 }
741 let _ = del_pipe.query_async::<()>(conn).await;
742 }
743
744 let mut pipe = redis::pipe();
746 let mut batch_count = 0;
747
748 for (batch_idx, (key, list_elements)) in batch_items.iter().enumerate() {
749 if existing_keys.contains(&batch_idx) {
751 keys_skipped += 1;
752 continue;
753 }
754
755 if list_elements.is_empty() {
757 continue;
758 }
759
760 pipe.rpush(*key, *list_elements);
761 if let Some(seconds) = ttl {
762 pipe.expire(*key, seconds);
763 }
764 batch_count += 1;
765 }
766
767 if batch_count > 0 {
769 match pipe.query_async::<()>(conn).await {
770 Ok(_) => keys_written += batch_count,
771 Err(_) => keys_failed += batch_count,
772 }
773 }
774 }
775
776 Ok(WriteResult {
777 keys_written,
778 keys_failed,
779 keys_skipped,
780 })
781}
782
783pub fn write_sets(
795 url: &str,
796 keys: Vec<String>,
797 members: Vec<Vec<String>>,
798 ttl: Option<i64>,
799 if_exists: WriteMode,
800) -> Result<WriteResult> {
801 let runtime =
802 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
803
804 let connection = RedisConnection::new(url)?;
805
806 runtime.block_on(async {
807 let mut conn = connection.get_async_connection().await?;
808 write_sets_async(&mut conn, keys, members, ttl, if_exists).await
809 })
810}
811
812async fn write_sets_async(
814 conn: &mut redis::aio::MultiplexedConnection,
815 keys: Vec<String>,
816 members: Vec<Vec<String>>,
817 ttl: Option<i64>,
818 if_exists: WriteMode,
819) -> Result<WriteResult> {
820 let mut keys_written = 0;
821 let mut keys_failed = 0;
822 let mut keys_skipped = 0;
823
824 let items: Vec<_> = keys.iter().zip(members.iter()).collect();
825
826 for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
828 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
829 let batch_items = &items[batch_start..batch_end];
830
831 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
833 let mut pipe = redis::pipe();
834 for (key, _) in batch_items {
835 pipe.exists(*key);
836 }
837 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
838 exists_results
839 .into_iter()
840 .enumerate()
841 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
842 .collect()
843 } else {
844 HashSet::new()
845 };
846
847 if if_exists == WriteMode::Replace {
849 let mut del_pipe = redis::pipe();
850 for (key, _) in batch_items {
851 del_pipe.del(*key).ignore();
852 }
853 let _ = del_pipe.query_async::<()>(conn).await;
854 }
855
856 let mut pipe = redis::pipe();
858 let mut batch_count = 0;
859
860 for (batch_idx, (key, set_members)) in batch_items.iter().enumerate() {
861 if existing_keys.contains(&batch_idx) {
863 keys_skipped += 1;
864 continue;
865 }
866
867 if set_members.is_empty() {
869 continue;
870 }
871
872 pipe.sadd(*key, *set_members);
873 if let Some(seconds) = ttl {
874 pipe.expire(*key, seconds);
875 }
876 batch_count += 1;
877 }
878
879 if batch_count > 0 {
881 match pipe.query_async::<()>(conn).await {
882 Ok(_) => keys_written += batch_count,
883 Err(_) => keys_failed += batch_count,
884 }
885 }
886 }
887
888 Ok(WriteResult {
889 keys_written,
890 keys_failed,
891 keys_skipped,
892 })
893}
894
895pub fn write_zsets(
907 url: &str,
908 keys: Vec<String>,
909 members: Vec<Vec<(String, f64)>>,
910 ttl: Option<i64>,
911 if_exists: WriteMode,
912) -> Result<WriteResult> {
913 let runtime =
914 Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
915
916 let connection = RedisConnection::new(url)?;
917
918 runtime.block_on(async {
919 let mut conn = connection.get_async_connection().await?;
920 write_zsets_async(&mut conn, keys, members, ttl, if_exists).await
921 })
922}
923
924async fn write_zsets_async(
926 conn: &mut redis::aio::MultiplexedConnection,
927 keys: Vec<String>,
928 members: Vec<Vec<(String, f64)>>,
929 ttl: Option<i64>,
930 if_exists: WriteMode,
931) -> Result<WriteResult> {
932 let mut keys_written = 0;
933 let mut keys_failed = 0;
934 let mut keys_skipped = 0;
935
936 let items: Vec<_> = keys.iter().zip(members.iter()).collect();
937
938 for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
940 let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
941 let batch_items = &items[batch_start..batch_end];
942
943 let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
945 let mut pipe = redis::pipe();
946 for (key, _) in batch_items {
947 pipe.exists(*key);
948 }
949 let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
950 exists_results
951 .into_iter()
952 .enumerate()
953 .filter_map(|(i, exists)| if exists { Some(i) } else { None })
954 .collect()
955 } else {
956 HashSet::new()
957 };
958
959 if if_exists == WriteMode::Replace {
961 let mut del_pipe = redis::pipe();
962 for (key, _) in batch_items {
963 del_pipe.del(*key).ignore();
964 }
965 let _ = del_pipe.query_async::<()>(conn).await;
966 }
967
968 let mut pipe = redis::pipe();
970 let mut batch_count = 0;
971
972 for (batch_idx, (key, zset_members)) in batch_items.iter().enumerate() {
973 if existing_keys.contains(&batch_idx) {
975 keys_skipped += 1;
976 continue;
977 }
978
979 if zset_members.is_empty() {
981 continue;
982 }
983
984 let score_members: Vec<(f64, &str)> =
986 zset_members.iter().map(|(m, s)| (*s, m.as_str())).collect();
987
988 pipe.zadd_multiple(*key, &score_members);
989 if let Some(seconds) = ttl {
990 pipe.expire(*key, seconds);
991 }
992 batch_count += 1;
993 }
994
995 if batch_count > 0 {
997 match pipe.query_async::<()>(conn).await {
998 Ok(_) => keys_written += batch_count,
999 Err(_) => keys_failed += batch_count,
1000 }
1001 }
1002 }
1003
1004 Ok(WriteResult {
1005 keys_written,
1006 keys_failed,
1007 keys_skipped,
1008 })
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use super::*;
1014
1015 #[test]
1016 fn test_write_result_creation() {
1017 let result = WriteResult {
1018 keys_written: 10,
1019 keys_failed: 2,
1020 keys_skipped: 1,
1021 };
1022 assert_eq!(result.keys_written, 10);
1023 assert_eq!(result.keys_failed, 2);
1024 assert_eq!(result.keys_skipped, 1);
1025 }
1026
1027 #[test]
1028 fn test_write_mode_from_str() {
1029 use std::str::FromStr;
1030 assert_eq!(WriteMode::from_str("fail").unwrap(), WriteMode::Fail);
1031 assert_eq!(WriteMode::from_str("FAIL").unwrap(), WriteMode::Fail);
1032 assert_eq!(WriteMode::from_str("replace").unwrap(), WriteMode::Replace);
1033 assert_eq!(WriteMode::from_str("Replace").unwrap(), WriteMode::Replace);
1034 assert_eq!(WriteMode::from_str("append").unwrap(), WriteMode::Append);
1035 assert_eq!(WriteMode::from_str("APPEND").unwrap(), WriteMode::Append);
1036 assert!(WriteMode::from_str("invalid").is_err());
1037 }
1038
1039 #[test]
1040 fn test_write_mode_default() {
1041 assert_eq!(WriteMode::default(), WriteMode::Replace);
1042 }
1043
1044 #[test]
1045 fn test_write_result_detailed_new() {
1046 let result = WriteResultDetailed::new();
1047 assert_eq!(result.keys_written, 0);
1048 assert_eq!(result.keys_failed, 0);
1049 assert_eq!(result.keys_skipped, 0);
1050 assert!(result.succeeded_keys.is_empty());
1051 assert!(result.errors.is_empty());
1052 }
1053
1054 #[test]
1055 fn test_write_result_detailed_complete_success() {
1056 let mut result = WriteResultDetailed::new();
1057 result.keys_written = 5;
1058 result.succeeded_keys = vec!["key1".into(), "key2".into()];
1059
1060 assert!(result.is_complete_success());
1061 assert!(result.failed_keys().is_empty());
1062 }
1063
1064 #[test]
1065 fn test_write_result_detailed_with_failures() {
1066 let mut result = WriteResultDetailed::new();
1067 result.keys_written = 3;
1068 result.keys_failed = 2;
1069 result.succeeded_keys = vec!["key1".into(), "key2".into(), "key3".into()];
1070 result.errors = vec![
1071 KeyError {
1072 key: "key4".into(),
1073 error: "WRONGTYPE".into(),
1074 },
1075 KeyError {
1076 key: "key5".into(),
1077 error: "OOM".into(),
1078 },
1079 ];
1080
1081 assert!(!result.is_complete_success());
1082
1083 let failed = result.failed_keys();
1084 assert_eq!(failed.len(), 2);
1085 assert!(failed.contains(&"key4"));
1086 assert!(failed.contains(&"key5"));
1087
1088 let error_map = result.error_map();
1089 assert_eq!(error_map.get("key4"), Some(&"WRONGTYPE"));
1090 assert_eq!(error_map.get("key5"), Some(&"OOM"));
1091 }
1092
1093 #[test]
1094 fn test_write_result_detailed_to_basic() {
1095 let mut result = WriteResultDetailed::new();
1096 result.keys_written = 10;
1097 result.keys_failed = 2;
1098 result.keys_skipped = 3;
1099 result.succeeded_keys = vec!["key1".into()];
1100 result.errors = vec![KeyError {
1101 key: "key2".into(),
1102 error: "error".into(),
1103 }];
1104
1105 let basic = result.to_basic();
1106 assert_eq!(basic.keys_written, 10);
1107 assert_eq!(basic.keys_failed, 2);
1108 assert_eq!(basic.keys_skipped, 3);
1109 }
1110
1111 #[test]
1112 fn test_key_error_creation() {
1113 let error = KeyError {
1114 key: "user:123".into(),
1115 error: "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1116 };
1117 assert_eq!(error.key, "user:123");
1118 assert!(error.error.contains("WRONGTYPE"));
1119 }
1120}