rrag_graph/nodes/
transform.rs

1//! # Transform Node Implementation
2//!
3//! Transform nodes modify and process data in the state.
4
5use crate::core::{ExecutionContext, ExecutionResult, Node, NodeId};
6use crate::state::{GraphState, StateValue};
7use crate::{RGraphError, RGraphResult};
8use async_trait::async_trait;
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13/// Configuration for transform nodes
14#[derive(Debug, Clone)]
15#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16pub struct TransformNodeConfig {
17    pub input_key: String,
18    pub output_key: String,
19    pub transform_type: TransformType,
20}
21
22/// Types of transformations
23#[derive(Debug, Clone)]
24#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
25pub enum TransformType {
26    /// Convert to uppercase
27    ToUpperCase,
28    /// Convert to lowercase
29    ToLowerCase,
30    /// Extract substring
31    Substring { start: usize, length: Option<usize> },
32    /// Replace text
33    Replace { from: String, to: String },
34    /// JSON parse
35    JsonParse,
36    /// JSON stringify
37    JsonStringify,
38}
39
40/// A node that transforms data
41pub struct TransformNode {
42    id: NodeId,
43    name: String,
44    config: TransformNodeConfig,
45}
46
47impl TransformNode {
48    pub fn new(
49        id: impl Into<NodeId>,
50        name: impl Into<String>,
51        config: TransformNodeConfig,
52    ) -> Self {
53        Self {
54            id: id.into(),
55            name: name.into(),
56            config,
57        }
58    }
59
60    fn apply_transform(&self, input: &StateValue) -> RGraphResult<StateValue> {
61        match &self.config.transform_type {
62            TransformType::ToUpperCase => {
63                if let Some(s) = input.as_string() {
64                    Ok(StateValue::String(s.to_uppercase()))
65                } else {
66                    Err(RGraphError::node(
67                        self.id.as_str(),
68                        "ToUpperCase requires string input",
69                    ))
70                }
71            }
72            TransformType::ToLowerCase => {
73                if let Some(s) = input.as_string() {
74                    Ok(StateValue::String(s.to_lowercase()))
75                } else {
76                    Err(RGraphError::node(
77                        self.id.as_str(),
78                        "ToLowerCase requires string input",
79                    ))
80                }
81            }
82            TransformType::Substring { start, length } => {
83                if let Some(s) = input.as_string() {
84                    let end = length.map(|l| start + l).unwrap_or(s.len());
85                    let substring = s.chars().skip(*start).take(end - start).collect::<String>();
86                    Ok(StateValue::String(substring))
87                } else {
88                    Err(RGraphError::node(
89                        self.id.as_str(),
90                        "Substring requires string input",
91                    ))
92                }
93            }
94            TransformType::Replace { from, to } => {
95                if let Some(s) = input.as_string() {
96                    Ok(StateValue::String(s.replace(from, to)))
97                } else {
98                    Err(RGraphError::node(
99                        self.id.as_str(),
100                        "Replace requires string input",
101                    ))
102                }
103            }
104            TransformType::JsonParse => {
105                if let Some(s) = input.as_string() {
106                    match serde_json::from_str::<serde_json::Value>(s) {
107                        Ok(json) => Ok(StateValue::from(json)),
108                        Err(e) => Err(RGraphError::node(
109                            self.id.as_str(),
110                            format!("JSON parse error: {}", e),
111                        )),
112                    }
113                } else {
114                    Err(RGraphError::node(
115                        self.id.as_str(),
116                        "JsonParse requires string input",
117                    ))
118                }
119            }
120            TransformType::JsonStringify => {
121                let json_value: serde_json::Value = input.clone().into();
122                match serde_json::to_string(&json_value) {
123                    Ok(json_str) => Ok(StateValue::String(json_str)),
124                    Err(e) => Err(RGraphError::node(
125                        self.id.as_str(),
126                        format!("JSON stringify error: {}", e),
127                    )),
128                }
129            }
130        }
131    }
132}
133
134#[async_trait]
135impl Node for TransformNode {
136    async fn execute(
137        &self,
138        state: &mut GraphState,
139        context: &ExecutionContext,
140    ) -> RGraphResult<ExecutionResult> {
141        // Get input value
142        let input_value = state.get(&self.config.input_key)?;
143
144        // Apply transformation
145        let output_value = self.apply_transform(&input_value)?;
146
147        // Store output
148        state.set_with_context(
149            context.current_node.as_str(),
150            &self.config.output_key,
151            output_value,
152        );
153
154        Ok(ExecutionResult::Continue)
155    }
156
157    fn id(&self) -> &NodeId {
158        &self.id
159    }
160
161    fn name(&self) -> &str {
162        &self.name
163    }
164
165    fn input_keys(&self) -> Vec<&str> {
166        vec![&self.config.input_key]
167    }
168
169    fn output_keys(&self) -> Vec<&str> {
170        vec![&self.config.output_key]
171    }
172}