polars_redis/
write.rs

1//! Write support for Redis data structures.
2//!
3//! This module provides functionality to write Polars DataFrames to Redis
4//! as hashes or JSON documents.
5//!
6//! Write operations use Redis pipelining for improved performance, processing
7//! keys in configurable batches (default: 1000 keys per batch).
8//!
9//! # Error Granularity
10//!
11//! By default, write operations report aggregate success/failure counts.
12//! For per-key error details, use the `_detailed` variants which return
13//! [`WriteResultDetailed`] with information about which specific keys failed.
14
15use std::collections::{HashMap, HashSet};
16
17use redis::Value;
18use tokio::runtime::Runtime;
19
20use crate::connection::RedisConnection;
21use crate::error::{Error, Result};
22
23/// Default batch size for pipelined write operations.
24const DEFAULT_WRITE_BATCH_SIZE: usize = 1000;
25
26/// Write mode for handling existing keys.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum WriteMode {
29    /// Fail if any key already exists.
30    Fail,
31    /// Replace existing keys (default behavior).
32    #[default]
33    Replace,
34    /// Append to existing keys (for hashes: merge fields; for JSON/strings: same as replace).
35    Append,
36}
37
38impl std::str::FromStr for WriteMode {
39    type Err = Error;
40
41    fn from_str(s: &str) -> Result<Self> {
42        match s.to_lowercase().as_str() {
43            "fail" => Ok(WriteMode::Fail),
44            "replace" => Ok(WriteMode::Replace),
45            "append" => Ok(WriteMode::Append),
46            _ => Err(Error::InvalidInput(format!(
47                "Invalid write mode '{}'. Expected: fail, replace, or append",
48                s
49            ))),
50        }
51    }
52}
53
54/// Result of a write operation (basic).
55#[derive(Debug, Clone)]
56pub struct WriteResult {
57    /// Number of keys successfully written.
58    pub keys_written: usize,
59    /// Number of keys that failed to write.
60    pub keys_failed: usize,
61    /// Number of keys skipped because they already exist (when mode is Fail).
62    pub keys_skipped: usize,
63}
64
65/// Error information for a single key.
66#[derive(Debug, Clone)]
67pub struct KeyError {
68    /// The Redis key that failed.
69    pub key: String,
70    /// The error message from Redis.
71    pub error: String,
72}
73
74/// Detailed result of a write operation with per-key error information.
75///
76/// This provides granular error reporting for production workflows where
77/// partial success is acceptable and retry logic is needed.
78///
79/// # Example
80///
81/// ```ignore
82/// let result = write_hashes_detailed(url, keys, fields, values, None, WriteMode::Replace)?;
83///
84/// println!("Succeeded: {}", result.keys_written);
85/// println!("Failed: {}", result.keys_failed);
86///
87/// for error in &result.errors {
88///     eprintln!("Key {} failed: {}", error.key, error.error);
89/// }
90///
91/// // Get list of failed keys for retry
92/// let failed_keys: Vec<&str> = result.failed_keys();
93/// ```
94#[derive(Debug, Clone)]
95pub struct WriteResultDetailed {
96    /// Number of keys successfully written.
97    pub keys_written: usize,
98    /// Number of keys that failed to write.
99    pub keys_failed: usize,
100    /// Number of keys skipped because they already exist (when mode is Fail).
101    pub keys_skipped: usize,
102    /// List of keys that were successfully written.
103    pub succeeded_keys: Vec<String>,
104    /// Detailed error information for each failed key.
105    pub errors: Vec<KeyError>,
106}
107
108impl WriteResultDetailed {
109    /// Create a new empty detailed result.
110    pub fn new() -> Self {
111        Self {
112            keys_written: 0,
113            keys_failed: 0,
114            keys_skipped: 0,
115            succeeded_keys: Vec::new(),
116            errors: Vec::new(),
117        }
118    }
119
120    /// Get a list of keys that failed to write.
121    pub fn failed_keys(&self) -> Vec<&str> {
122        self.errors.iter().map(|e| e.key.as_str()).collect()
123    }
124
125    /// Get a map of failed keys to their error messages.
126    pub fn error_map(&self) -> HashMap<&str, &str> {
127        self.errors
128            .iter()
129            .map(|e| (e.key.as_str(), e.error.as_str()))
130            .collect()
131    }
132
133    /// Check if all keys were written successfully.
134    pub fn is_complete_success(&self) -> bool {
135        self.keys_failed == 0
136    }
137
138    /// Convert to a basic WriteResult (discards per-key details).
139    pub fn to_basic(&self) -> WriteResult {
140        WriteResult {
141            keys_written: self.keys_written,
142            keys_failed: self.keys_failed,
143            keys_skipped: self.keys_skipped,
144        }
145    }
146}
147
148impl Default for WriteResultDetailed {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154/// Write hashes to Redis from field data.
155///
156/// # Arguments
157/// * `url` - Redis connection URL
158/// * `keys` - List of Redis keys to write to
159/// * `fields` - List of field names
160/// * `values` - 2D list of values (rows x columns), same order as fields
161/// * `ttl` - Optional TTL in seconds for each key
162/// * `if_exists` - How to handle existing keys (fail, replace, append)
163///
164/// # Returns
165/// A `WriteResult` with the number of keys written.
166pub fn write_hashes(
167    url: &str,
168    keys: Vec<String>,
169    fields: Vec<String>,
170    values: Vec<Vec<Option<String>>>,
171    ttl: Option<i64>,
172    if_exists: WriteMode,
173) -> Result<WriteResult> {
174    let runtime =
175        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
176
177    let connection = RedisConnection::new(url)?;
178
179    runtime.block_on(async {
180        let mut conn = connection.get_async_connection().await?;
181        write_hashes_async(&mut conn, keys, fields, values, ttl, if_exists).await
182    })
183}
184
185/// Async implementation of hash writing with pipelining.
186async fn write_hashes_async(
187    conn: &mut redis::aio::MultiplexedConnection,
188    keys: Vec<String>,
189    fields: Vec<String>,
190    values: Vec<Vec<Option<String>>>,
191    ttl: Option<i64>,
192    if_exists: WriteMode,
193) -> Result<WriteResult> {
194    let mut keys_written = 0;
195    let mut keys_failed = 0;
196    let mut keys_skipped = 0;
197
198    // Process in batches for better performance
199    for batch_start in (0..keys.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
200        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(keys.len());
201        let batch_keys = &keys[batch_start..batch_end];
202
203        // For Fail mode, check existence of all keys in batch first
204        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
205            let mut pipe = redis::pipe();
206            for key in batch_keys {
207                pipe.exists(key);
208            }
209            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
210            exists_results
211                .into_iter()
212                .enumerate()
213                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
214                .collect()
215        } else {
216            HashSet::new()
217        };
218
219        // For Replace mode, delete existing keys in batch
220        if if_exists == WriteMode::Replace {
221            let mut del_pipe = redis::pipe();
222            for key in batch_keys {
223                del_pipe.del(key).ignore();
224            }
225            let _ = del_pipe.query_async::<()>(conn).await;
226        }
227
228        // Build pipeline for HSET operations
229        let mut pipe = redis::pipe();
230        let mut batch_indices: Vec<usize> = Vec::new();
231
232        for (batch_idx, key) in batch_keys.iter().enumerate() {
233            let global_idx = batch_start + batch_idx;
234
235            // Skip if key exists and mode is Fail
236            if existing_keys.contains(&batch_idx) {
237                keys_skipped += 1;
238                continue;
239            }
240
241            if global_idx >= values.len() {
242                break;
243            }
244
245            let row = &values[global_idx];
246            let mut hash_data: Vec<(&str, &str)> = Vec::new();
247
248            for (j, field) in fields.iter().enumerate() {
249                if j < row.len()
250                    && let Some(value) = &row[j]
251                {
252                    hash_data.push((field.as_str(), value.as_str()));
253                }
254            }
255
256            if !hash_data.is_empty() {
257                pipe.hset_multiple(key, &hash_data);
258                if let Some(seconds) = ttl {
259                    pipe.expire(key, seconds);
260                }
261                batch_indices.push(batch_idx);
262            }
263        }
264
265        // Execute pipeline if there are commands
266        if !batch_indices.is_empty() {
267            match pipe.query_async::<()>(conn).await {
268                Ok(_) => keys_written += batch_indices.len(),
269                Err(_) => keys_failed += batch_indices.len(),
270            }
271        }
272    }
273
274    Ok(WriteResult {
275        keys_written,
276        keys_failed,
277        keys_skipped,
278    })
279}
280
281/// Write hashes to Redis with detailed per-key error reporting.
282///
283/// This is similar to [`write_hashes`] but returns detailed information about
284/// which specific keys succeeded or failed, enabling retry logic and better
285/// error handling in production workflows.
286///
287/// # Arguments
288/// * `url` - Redis connection URL
289/// * `keys` - List of Redis keys to write to
290/// * `fields` - List of field names
291/// * `values` - 2D list of values (rows x columns), same order as fields
292/// * `ttl` - Optional TTL in seconds for each key
293/// * `if_exists` - How to handle existing keys (fail, replace, append)
294///
295/// # Returns
296/// A [`WriteResultDetailed`] with per-key success/failure information.
297///
298/// # Example
299///
300/// ```ignore
301/// let result = write_hashes_detailed(
302///     "redis://localhost:6379",
303///     keys,
304///     fields,
305///     values,
306///     None,
307///     WriteMode::Replace,
308/// )?;
309///
310/// if !result.is_complete_success() {
311///     for error in &result.errors {
312///         eprintln!("Failed to write {}: {}", error.key, error.error);
313///     }
314/// }
315/// ```
316pub fn write_hashes_detailed(
317    url: &str,
318    keys: Vec<String>,
319    fields: Vec<String>,
320    values: Vec<Vec<Option<String>>>,
321    ttl: Option<i64>,
322    if_exists: WriteMode,
323) -> Result<WriteResultDetailed> {
324    let runtime =
325        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
326
327    let connection = RedisConnection::new(url)?;
328
329    runtime.block_on(async {
330        let mut conn = connection.get_async_connection().await?;
331        write_hashes_detailed_async(&mut conn, keys, fields, values, ttl, if_exists).await
332    })
333}
334
335/// Async implementation of detailed hash writing with per-key error tracking.
336async fn write_hashes_detailed_async(
337    conn: &mut redis::aio::MultiplexedConnection,
338    keys: Vec<String>,
339    fields: Vec<String>,
340    values: Vec<Vec<Option<String>>>,
341    ttl: Option<i64>,
342    if_exists: WriteMode,
343) -> Result<WriteResultDetailed> {
344    let mut result = WriteResultDetailed::new();
345    // Process in batches for better performance
346    for batch_start in (0..keys.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
347        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(keys.len());
348        let batch_keys = &keys[batch_start..batch_end];
349
350        // For Fail mode, check existence of all keys in batch first
351        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
352            let mut pipe = redis::pipe();
353            for key in batch_keys {
354                pipe.exists(key);
355            }
356            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
357            exists_results
358                .into_iter()
359                .enumerate()
360                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
361                .collect()
362        } else {
363            HashSet::new()
364        };
365
366        // For Replace mode, delete existing keys in batch
367        if if_exists == WriteMode::Replace {
368            let mut del_pipe = redis::pipe();
369            for key in batch_keys {
370                del_pipe.del(key).ignore();
371            }
372            let _ = del_pipe.query_async::<()>(conn).await;
373        }
374
375        // Build pipeline for HSET operations, tracking which key each command belongs to
376        let mut pipe = redis::pipe();
377        // Track (key_string, commands_for_this_key) for each key in the pipeline
378        let mut key_command_counts: Vec<(String, usize)> = Vec::new();
379
380        for (batch_idx, key) in batch_keys.iter().enumerate() {
381            let global_idx = batch_start + batch_idx;
382
383            // Skip if key exists and mode is Fail
384            if existing_keys.contains(&batch_idx) {
385                result.keys_skipped += 1;
386                continue;
387            }
388
389            if global_idx >= values.len() {
390                break;
391            }
392
393            let row = &values[global_idx];
394            let mut hash_data: Vec<(&str, &str)> = Vec::new();
395
396            for (j, field) in fields.iter().enumerate() {
397                if j < row.len()
398                    && let Some(value) = &row[j]
399                {
400                    hash_data.push((field.as_str(), value.as_str()));
401                }
402            }
403
404            if !hash_data.is_empty() {
405                pipe.hset_multiple(key, &hash_data);
406                let mut cmd_count = 1;
407                if let Some(seconds) = ttl {
408                    pipe.expire(key, seconds);
409                    cmd_count += 1;
410                }
411                key_command_counts.push((key.clone(), cmd_count));
412            }
413        }
414
415        // Execute pipeline and collect individual results
416        if !key_command_counts.is_empty() {
417            match pipe.query_async::<Vec<Value>>(conn).await {
418                Ok(responses) => {
419                    // Process responses, mapping back to keys
420                    let mut response_idx = 0;
421                    for (key, cmd_count) in &key_command_counts {
422                        let mut key_succeeded = true;
423                        let mut key_error = String::new();
424
425                        // Check all commands for this key
426                        for _ in 0..*cmd_count {
427                            if response_idx < responses.len() {
428                                if let Value::ServerError(err) = &responses[response_idx] {
429                                    key_succeeded = false;
430                                    key_error = err.to_string();
431                                }
432                                response_idx += 1;
433                            }
434                        }
435
436                        if key_succeeded {
437                            result.keys_written += 1;
438                            result.succeeded_keys.push(key.clone());
439                        } else {
440                            result.keys_failed += 1;
441                            result.errors.push(KeyError {
442                                key: key.clone(),
443                                error: key_error,
444                            });
445                        }
446                    }
447                }
448                Err(e) => {
449                    // Entire pipeline failed - mark all keys as failed
450                    for (key, _) in key_command_counts {
451                        result.keys_failed += 1;
452                        result.errors.push(KeyError {
453                            key,
454                            error: e.to_string(),
455                        });
456                    }
457                }
458            }
459        }
460    }
461
462    Ok(result)
463}
464
465/// Write JSON documents to Redis.
466///
467/// # Arguments
468/// * `url` - Redis connection URL
469/// * `keys` - List of Redis keys to write to
470/// * `json_strings` - List of JSON strings to write
471/// * `ttl` - Optional TTL in seconds for each key
472/// * `if_exists` - How to handle existing keys (fail, replace, append)
473///
474/// # Returns
475/// A `WriteResult` with the number of keys written.
476pub fn write_json(
477    url: &str,
478    keys: Vec<String>,
479    json_strings: Vec<String>,
480    ttl: Option<i64>,
481    if_exists: WriteMode,
482) -> Result<WriteResult> {
483    let runtime =
484        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
485
486    let connection = RedisConnection::new(url)?;
487
488    runtime.block_on(async {
489        let mut conn = connection.get_async_connection().await?;
490        write_json_async(&mut conn, keys, json_strings, ttl, if_exists).await
491    })
492}
493
494/// Async implementation of JSON writing with pipelining.
495async fn write_json_async(
496    conn: &mut redis::aio::MultiplexedConnection,
497    keys: Vec<String>,
498    json_strings: Vec<String>,
499    ttl: Option<i64>,
500    if_exists: WriteMode,
501) -> Result<WriteResult> {
502    let mut keys_written = 0;
503    let mut keys_failed = 0;
504    let mut keys_skipped = 0;
505
506    let items: Vec<_> = keys.iter().zip(json_strings.iter()).collect();
507
508    // Process in batches for better performance
509    for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
510        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
511        let batch_items = &items[batch_start..batch_end];
512
513        // For Fail mode, check existence of all keys in batch first
514        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
515            let mut pipe = redis::pipe();
516            for (key, _) in batch_items {
517                pipe.exists(*key);
518            }
519            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
520            exists_results
521                .into_iter()
522                .enumerate()
523                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
524                .collect()
525        } else {
526            HashSet::new()
527        };
528
529        // Build pipeline for JSON.SET operations
530        let mut pipe = redis::pipe();
531        let mut batch_count = 0;
532
533        for (batch_idx, (key, json_str)) in batch_items.iter().enumerate() {
534            // Skip if key exists and mode is Fail
535            if existing_keys.contains(&batch_idx) {
536                keys_skipped += 1;
537                continue;
538            }
539
540            pipe.cmd("JSON.SET").arg(*key).arg("$").arg(*json_str);
541            if let Some(seconds) = ttl {
542                pipe.expire(*key, seconds);
543            }
544            batch_count += 1;
545        }
546
547        // Execute pipeline if there are commands
548        if batch_count > 0 {
549            match pipe.query_async::<()>(conn).await {
550                Ok(_) => keys_written += batch_count,
551                Err(_) => keys_failed += batch_count,
552            }
553        }
554    }
555
556    Ok(WriteResult {
557        keys_written,
558        keys_failed,
559        keys_skipped,
560    })
561}
562
563/// Write string values to Redis.
564///
565/// # Arguments
566/// * `url` - Redis connection URL
567/// * `keys` - List of Redis keys to write to
568/// * `values` - List of string values to write
569/// * `ttl` - Optional TTL in seconds for each key
570/// * `if_exists` - How to handle existing keys (fail, replace, append)
571///
572/// # Returns
573/// A `WriteResult` with the number of keys written.
574pub fn write_strings(
575    url: &str,
576    keys: Vec<String>,
577    values: Vec<Option<String>>,
578    ttl: Option<i64>,
579    if_exists: WriteMode,
580) -> Result<WriteResult> {
581    let runtime =
582        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
583
584    let connection = RedisConnection::new(url)?;
585
586    runtime.block_on(async {
587        let mut conn = connection.get_async_connection().await?;
588        write_strings_async(&mut conn, keys, values, ttl, if_exists).await
589    })
590}
591
592/// Async implementation of string writing with pipelining.
593async fn write_strings_async(
594    conn: &mut redis::aio::MultiplexedConnection,
595    keys: Vec<String>,
596    values: Vec<Option<String>>,
597    ttl: Option<i64>,
598    if_exists: WriteMode,
599) -> Result<WriteResult> {
600    let mut keys_written = 0;
601    let mut keys_failed = 0;
602    let mut keys_skipped = 0;
603
604    // Collect non-null items
605    let items: Vec<_> = keys
606        .iter()
607        .zip(values.iter())
608        .filter_map(|(k, v)| v.as_ref().map(|val| (k, val)))
609        .collect();
610
611    // Process in batches for better performance
612    for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
613        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
614        let batch_items = &items[batch_start..batch_end];
615
616        // For Fail mode, check existence of all keys in batch first
617        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
618            let mut pipe = redis::pipe();
619            for (key, _) in batch_items {
620                pipe.exists(*key);
621            }
622            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
623            exists_results
624                .into_iter()
625                .enumerate()
626                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
627                .collect()
628        } else {
629            HashSet::new()
630        };
631
632        // Build pipeline for SET operations
633        let mut pipe = redis::pipe();
634        let mut batch_count = 0;
635
636        for (batch_idx, (key, val)) in batch_items.iter().enumerate() {
637            // Skip if key exists and mode is Fail
638            if existing_keys.contains(&batch_idx) {
639                keys_skipped += 1;
640                continue;
641            }
642
643            // For Append mode on strings, we could use APPEND command,
644            // but that concatenates strings which is probably not what users want.
645            // So we treat Append same as Replace for strings.
646            if let Some(seconds) = ttl {
647                // SETEX for atomic set with TTL
648                pipe.cmd("SETEX").arg(*key).arg(seconds).arg(*val);
649            } else {
650                pipe.set(*key, *val);
651            }
652            batch_count += 1;
653        }
654
655        // Execute pipeline if there are commands
656        if batch_count > 0 {
657            match pipe.query_async::<()>(conn).await {
658                Ok(_) => keys_written += batch_count,
659                Err(_) => keys_failed += batch_count,
660            }
661        }
662    }
663
664    Ok(WriteResult {
665        keys_written,
666        keys_failed,
667        keys_skipped,
668    })
669}
670
671/// Write list elements to Redis.
672///
673/// # Arguments
674/// * `url` - Redis connection URL
675/// * `keys` - List of Redis keys to write to
676/// * `elements` - 2D list of elements for each list
677/// * `ttl` - Optional TTL in seconds for each key
678/// * `if_exists` - How to handle existing keys (fail, replace, append)
679///
680/// # Returns
681/// A `WriteResult` with the number of keys written.
682pub fn write_lists(
683    url: &str,
684    keys: Vec<String>,
685    elements: Vec<Vec<String>>,
686    ttl: Option<i64>,
687    if_exists: WriteMode,
688) -> Result<WriteResult> {
689    let runtime =
690        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
691
692    let connection = RedisConnection::new(url)?;
693
694    runtime.block_on(async {
695        let mut conn = connection.get_async_connection().await?;
696        write_lists_async(&mut conn, keys, elements, ttl, if_exists).await
697    })
698}
699
700/// Async implementation of list writing with pipelining.
701async fn write_lists_async(
702    conn: &mut redis::aio::MultiplexedConnection,
703    keys: Vec<String>,
704    elements: Vec<Vec<String>>,
705    ttl: Option<i64>,
706    if_exists: WriteMode,
707) -> Result<WriteResult> {
708    let mut keys_written = 0;
709    let mut keys_failed = 0;
710    let mut keys_skipped = 0;
711
712    let items: Vec<_> = keys.iter().zip(elements.iter()).collect();
713
714    // Process in batches for better performance
715    for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
716        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
717        let batch_items = &items[batch_start..batch_end];
718
719        // For Fail mode, check existence of all keys in batch first
720        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
721            let mut pipe = redis::pipe();
722            for (key, _) in batch_items {
723                pipe.exists(*key);
724            }
725            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
726            exists_results
727                .into_iter()
728                .enumerate()
729                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
730                .collect()
731        } else {
732            HashSet::new()
733        };
734
735        // For Replace mode, delete existing keys in batch
736        if if_exists == WriteMode::Replace {
737            let mut del_pipe = redis::pipe();
738            for (key, _) in batch_items {
739                del_pipe.del(*key).ignore();
740            }
741            let _ = del_pipe.query_async::<()>(conn).await;
742        }
743
744        // Build pipeline for RPUSH operations
745        let mut pipe = redis::pipe();
746        let mut batch_count = 0;
747
748        for (batch_idx, (key, list_elements)) in batch_items.iter().enumerate() {
749            // Skip if key exists and mode is Fail
750            if existing_keys.contains(&batch_idx) {
751                keys_skipped += 1;
752                continue;
753            }
754
755            // Skip empty lists
756            if list_elements.is_empty() {
757                continue;
758            }
759
760            pipe.rpush(*key, *list_elements);
761            if let Some(seconds) = ttl {
762                pipe.expire(*key, seconds);
763            }
764            batch_count += 1;
765        }
766
767        // Execute pipeline if there are commands
768        if batch_count > 0 {
769            match pipe.query_async::<()>(conn).await {
770                Ok(_) => keys_written += batch_count,
771                Err(_) => keys_failed += batch_count,
772            }
773        }
774    }
775
776    Ok(WriteResult {
777        keys_written,
778        keys_failed,
779        keys_skipped,
780    })
781}
782
783/// Write set members to Redis.
784///
785/// # Arguments
786/// * `url` - Redis connection URL
787/// * `keys` - List of Redis keys to write to
788/// * `members` - 2D list of members for each set
789/// * `ttl` - Optional TTL in seconds for each key
790/// * `if_exists` - How to handle existing keys (fail, replace, append)
791///
792/// # Returns
793/// A `WriteResult` with the number of keys written.
794pub fn write_sets(
795    url: &str,
796    keys: Vec<String>,
797    members: Vec<Vec<String>>,
798    ttl: Option<i64>,
799    if_exists: WriteMode,
800) -> Result<WriteResult> {
801    let runtime =
802        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
803
804    let connection = RedisConnection::new(url)?;
805
806    runtime.block_on(async {
807        let mut conn = connection.get_async_connection().await?;
808        write_sets_async(&mut conn, keys, members, ttl, if_exists).await
809    })
810}
811
812/// Async implementation of set writing with pipelining.
813async fn write_sets_async(
814    conn: &mut redis::aio::MultiplexedConnection,
815    keys: Vec<String>,
816    members: Vec<Vec<String>>,
817    ttl: Option<i64>,
818    if_exists: WriteMode,
819) -> Result<WriteResult> {
820    let mut keys_written = 0;
821    let mut keys_failed = 0;
822    let mut keys_skipped = 0;
823
824    let items: Vec<_> = keys.iter().zip(members.iter()).collect();
825
826    // Process in batches for better performance
827    for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
828        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
829        let batch_items = &items[batch_start..batch_end];
830
831        // For Fail mode, check existence of all keys in batch first
832        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
833            let mut pipe = redis::pipe();
834            for (key, _) in batch_items {
835                pipe.exists(*key);
836            }
837            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
838            exists_results
839                .into_iter()
840                .enumerate()
841                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
842                .collect()
843        } else {
844            HashSet::new()
845        };
846
847        // For Replace mode, delete existing keys in batch
848        if if_exists == WriteMode::Replace {
849            let mut del_pipe = redis::pipe();
850            for (key, _) in batch_items {
851                del_pipe.del(*key).ignore();
852            }
853            let _ = del_pipe.query_async::<()>(conn).await;
854        }
855
856        // Build pipeline for SADD operations
857        let mut pipe = redis::pipe();
858        let mut batch_count = 0;
859
860        for (batch_idx, (key, set_members)) in batch_items.iter().enumerate() {
861            // Skip if key exists and mode is Fail
862            if existing_keys.contains(&batch_idx) {
863                keys_skipped += 1;
864                continue;
865            }
866
867            // Skip empty sets
868            if set_members.is_empty() {
869                continue;
870            }
871
872            pipe.sadd(*key, *set_members);
873            if let Some(seconds) = ttl {
874                pipe.expire(*key, seconds);
875            }
876            batch_count += 1;
877        }
878
879        // Execute pipeline if there are commands
880        if batch_count > 0 {
881            match pipe.query_async::<()>(conn).await {
882                Ok(_) => keys_written += batch_count,
883                Err(_) => keys_failed += batch_count,
884            }
885        }
886    }
887
888    Ok(WriteResult {
889        keys_written,
890        keys_failed,
891        keys_skipped,
892    })
893}
894
895/// Write sorted set members to Redis.
896///
897/// # Arguments
898/// * `url` - Redis connection URL
899/// * `keys` - List of Redis keys to write to
900/// * `members` - 2D list of (member, score) pairs for each sorted set
901/// * `ttl` - Optional TTL in seconds for each key
902/// * `if_exists` - How to handle existing keys (fail, replace, append)
903///
904/// # Returns
905/// A `WriteResult` with the number of keys written.
906pub fn write_zsets(
907    url: &str,
908    keys: Vec<String>,
909    members: Vec<Vec<(String, f64)>>,
910    ttl: Option<i64>,
911    if_exists: WriteMode,
912) -> Result<WriteResult> {
913    let runtime =
914        Runtime::new().map_err(|e| Error::Runtime(format!("Failed to create runtime: {}", e)))?;
915
916    let connection = RedisConnection::new(url)?;
917
918    runtime.block_on(async {
919        let mut conn = connection.get_async_connection().await?;
920        write_zsets_async(&mut conn, keys, members, ttl, if_exists).await
921    })
922}
923
924/// Async implementation of sorted set writing with pipelining.
925async fn write_zsets_async(
926    conn: &mut redis::aio::MultiplexedConnection,
927    keys: Vec<String>,
928    members: Vec<Vec<(String, f64)>>,
929    ttl: Option<i64>,
930    if_exists: WriteMode,
931) -> Result<WriteResult> {
932    let mut keys_written = 0;
933    let mut keys_failed = 0;
934    let mut keys_skipped = 0;
935
936    let items: Vec<_> = keys.iter().zip(members.iter()).collect();
937
938    // Process in batches for better performance
939    for batch_start in (0..items.len()).step_by(DEFAULT_WRITE_BATCH_SIZE) {
940        let batch_end = (batch_start + DEFAULT_WRITE_BATCH_SIZE).min(items.len());
941        let batch_items = &items[batch_start..batch_end];
942
943        // For Fail mode, check existence of all keys in batch first
944        let existing_keys: HashSet<usize> = if if_exists == WriteMode::Fail {
945            let mut pipe = redis::pipe();
946            for (key, _) in batch_items {
947                pipe.exists(*key);
948            }
949            let exists_results: Vec<bool> = pipe.query_async(conn).await.unwrap_or_default();
950            exists_results
951                .into_iter()
952                .enumerate()
953                .filter_map(|(i, exists)| if exists { Some(i) } else { None })
954                .collect()
955        } else {
956            HashSet::new()
957        };
958
959        // For Replace mode, delete existing keys in batch
960        if if_exists == WriteMode::Replace {
961            let mut del_pipe = redis::pipe();
962            for (key, _) in batch_items {
963                del_pipe.del(*key).ignore();
964            }
965            let _ = del_pipe.query_async::<()>(conn).await;
966        }
967
968        // Build pipeline for ZADD operations
969        let mut pipe = redis::pipe();
970        let mut batch_count = 0;
971
972        for (batch_idx, (key, zset_members)) in batch_items.iter().enumerate() {
973            // Skip if key exists and mode is Fail
974            if existing_keys.contains(&batch_idx) {
975                keys_skipped += 1;
976                continue;
977            }
978
979            // Skip empty sorted sets
980            if zset_members.is_empty() {
981                continue;
982            }
983
984            // ZADD expects (score, member) pairs
985            let score_members: Vec<(f64, &str)> =
986                zset_members.iter().map(|(m, s)| (*s, m.as_str())).collect();
987
988            pipe.zadd_multiple(*key, &score_members);
989            if let Some(seconds) = ttl {
990                pipe.expire(*key, seconds);
991            }
992            batch_count += 1;
993        }
994
995        // Execute pipeline if there are commands
996        if batch_count > 0 {
997            match pipe.query_async::<()>(conn).await {
998                Ok(_) => keys_written += batch_count,
999                Err(_) => keys_failed += batch_count,
1000            }
1001        }
1002    }
1003
1004    Ok(WriteResult {
1005        keys_written,
1006        keys_failed,
1007        keys_skipped,
1008    })
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use super::*;
1014
1015    #[test]
1016    fn test_write_result_creation() {
1017        let result = WriteResult {
1018            keys_written: 10,
1019            keys_failed: 2,
1020            keys_skipped: 1,
1021        };
1022        assert_eq!(result.keys_written, 10);
1023        assert_eq!(result.keys_failed, 2);
1024        assert_eq!(result.keys_skipped, 1);
1025    }
1026
1027    #[test]
1028    fn test_write_mode_from_str() {
1029        use std::str::FromStr;
1030        assert_eq!(WriteMode::from_str("fail").unwrap(), WriteMode::Fail);
1031        assert_eq!(WriteMode::from_str("FAIL").unwrap(), WriteMode::Fail);
1032        assert_eq!(WriteMode::from_str("replace").unwrap(), WriteMode::Replace);
1033        assert_eq!(WriteMode::from_str("Replace").unwrap(), WriteMode::Replace);
1034        assert_eq!(WriteMode::from_str("append").unwrap(), WriteMode::Append);
1035        assert_eq!(WriteMode::from_str("APPEND").unwrap(), WriteMode::Append);
1036        assert!(WriteMode::from_str("invalid").is_err());
1037    }
1038
1039    #[test]
1040    fn test_write_mode_default() {
1041        assert_eq!(WriteMode::default(), WriteMode::Replace);
1042    }
1043
1044    #[test]
1045    fn test_write_result_detailed_new() {
1046        let result = WriteResultDetailed::new();
1047        assert_eq!(result.keys_written, 0);
1048        assert_eq!(result.keys_failed, 0);
1049        assert_eq!(result.keys_skipped, 0);
1050        assert!(result.succeeded_keys.is_empty());
1051        assert!(result.errors.is_empty());
1052    }
1053
1054    #[test]
1055    fn test_write_result_detailed_complete_success() {
1056        let mut result = WriteResultDetailed::new();
1057        result.keys_written = 5;
1058        result.succeeded_keys = vec!["key1".into(), "key2".into()];
1059
1060        assert!(result.is_complete_success());
1061        assert!(result.failed_keys().is_empty());
1062    }
1063
1064    #[test]
1065    fn test_write_result_detailed_with_failures() {
1066        let mut result = WriteResultDetailed::new();
1067        result.keys_written = 3;
1068        result.keys_failed = 2;
1069        result.succeeded_keys = vec!["key1".into(), "key2".into(), "key3".into()];
1070        result.errors = vec![
1071            KeyError {
1072                key: "key4".into(),
1073                error: "WRONGTYPE".into(),
1074            },
1075            KeyError {
1076                key: "key5".into(),
1077                error: "OOM".into(),
1078            },
1079        ];
1080
1081        assert!(!result.is_complete_success());
1082
1083        let failed = result.failed_keys();
1084        assert_eq!(failed.len(), 2);
1085        assert!(failed.contains(&"key4"));
1086        assert!(failed.contains(&"key5"));
1087
1088        let error_map = result.error_map();
1089        assert_eq!(error_map.get("key4"), Some(&"WRONGTYPE"));
1090        assert_eq!(error_map.get("key5"), Some(&"OOM"));
1091    }
1092
1093    #[test]
1094    fn test_write_result_detailed_to_basic() {
1095        let mut result = WriteResultDetailed::new();
1096        result.keys_written = 10;
1097        result.keys_failed = 2;
1098        result.keys_skipped = 3;
1099        result.succeeded_keys = vec!["key1".into()];
1100        result.errors = vec![KeyError {
1101            key: "key2".into(),
1102            error: "error".into(),
1103        }];
1104
1105        let basic = result.to_basic();
1106        assert_eq!(basic.keys_written, 10);
1107        assert_eq!(basic.keys_failed, 2);
1108        assert_eq!(basic.keys_skipped, 3);
1109    }
1110
1111    #[test]
1112    fn test_key_error_creation() {
1113        let error = KeyError {
1114            key: "user:123".into(),
1115            error: "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1116        };
1117        assert_eq!(error.key, "user:123");
1118        assert!(error.error.contains("WRONGTYPE"));
1119    }
1120}