DataAgent

Struct DataAgent 

Source
pub struct DataAgent { /* private fields */ }
Expand description

DataAgent: A specialized agent for data analysis and processing tasks This agent is built on top of the TemplateAgent and provides data-specific functionality

Implementations§

Source§

impl DataAgent

Source

pub async fn new(_config: Config) -> Result<Self, KowalskiError>

Creates a new DataAgent with the specified configuration

Examples found in repository?
examples/csv_analysis.rs (line 17)
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12    // Initialize logging
13    env_logger::init();
14
15    // Load configuration
16    let config = Config::default();
17    let mut data_agent = DataAgent::new(config).await?;
18
19    // Start a conversation
20    println!("📊 Starting CSV Analysis...");
21    let conversation_id = data_agent.start_conversation("llama3.2");
22    println!("Data Agent Conversation ID: {}", conversation_id);
23
24    // Set up the role for data analysis
25    let role = Role::new(
26        "Data Analysis Assistant",
27        "You are an expert at analyzing and interpreting data from CSV files.",
28    )
29    .with_audience(Audience::new(
30        "Data Scientist",
31        "You are speaking to a data scientist who needs detailed analysis.",
32    ))
33    .with_preset(Preset::new(
34        "Analysis",
35        "Provide comprehensive analysis with insights and recommendations.",
36    ));
37
38    // Sample CSV data for analysis
39    let csv_data = r#"name,age,city,salary,department
40John Doe,30,New York,75000,Engineering
41Jane Smith,28,San Francisco,85000,Marketing
42Bob Johnson,35,Chicago,65000,Sales
43Alice Brown,32,Boston,70000,Engineering
44Charlie Wilson,29,Seattle,80000,Engineering
45Diana Davis,31,Austin,72000,Marketing
46Eve Miller,27,Denver,68000,Sales
47Frank Garcia,33,Portland,75000,Engineering
48Grace Lee,26,Atlanta,65000,Marketing
49Henry Taylor,34,Dallas,78000,Engineering"#;
50
51    println!("\n📈 Processing CSV Data:");
52    println!("{}", csv_data);
53
54    // Process the CSV data
55    let analysis_result = data_agent.process_csv(csv_data).await?;
56
57    println!("\n📊 CSV Analysis Results:");
58    println!("Headers: {:?}", analysis_result.headers);
59    println!("Total Rows: {}", analysis_result.total_rows);
60    println!("Total Columns: {}", analysis_result.total_columns);
61    println!(
62        "Summary: {}",
63        serde_json::to_string_pretty(&analysis_result.summary)?
64    );
65
66    // Ask the agent to analyze the data
67    let analysis_prompt = format!(
68        "Please analyze this CSV data and provide insights:\n\n{}\n\nAnalysis results:\n{}",
69        csv_data,
70        serde_json::to_string_pretty(&analysis_result.summary)?
71    );
72
73    let mut response = data_agent
74        .chat_with_history(&conversation_id, &analysis_prompt, Some(role))
75        .await?;
76
77    println!("\n🤖 AI Analysis:");
78
79    // Process the streaming response
80    let mut buffer = String::new();
81    while let Some(chunk) = response.chunk().await? {
82        match data_agent
83            .process_stream_response(&conversation_id, &chunk)
84            .await
85        {
86            Ok(Some(message)) => {
87                // Print the content if it exists
88                if !message.content.is_empty() {
89                    print!("{}", message.content);
90                    io::stdout().flush()?;
91                    buffer.push_str(&message.content);
92                }
93
94                // Handle tool calls if they exist
95                if let Some(tool_calls) = &message.tool_calls {
96                    for tool_call in tool_calls {
97                        print!("\n[Tool Call] {}(", tool_call.function.name);
98                        if let Some(obj) = tool_call.function.arguments.as_object() {
99                            for (key, value) in obj {
100                                print!("{}: {}, ", key, value);
101                            }
102                        }
103                        println!(")");
104                        io::stdout().flush()?;
105                    }
106                }
107            }
108            Ok(None) => {
109                data_agent
110                    .add_message(&conversation_id, "assistant", &buffer)
111                    .await;
112                println!("\n✅ Analysis complete!\n");
113                break;
114            }
115            Err(e) => {
116                eprintln!("\n❌ Error processing stream: {}", e);
117                break;
118            }
119        }
120    }
121
122    // Ask a follow-up question about specific insights
123    let follow_up = "What are the key insights about salary distribution across departments?";
124    let mut follow_up_response = data_agent
125        .chat_with_history(&conversation_id, follow_up, None)
126        .await?;
127
128    println!("\n🔍 Follow-up Analysis:");
129    let mut buffer = String::new();
130    while let Some(chunk) = follow_up_response.chunk().await? {
131        match data_agent
132            .process_stream_response(&conversation_id, &chunk)
133            .await
134        {
135            Ok(Some(message)) => {
136                // Print the content if it exists
137                if !message.content.is_empty() {
138                    print!("{}", message.content);
139                    io::stdout().flush()?;
140                    buffer.push_str(&message.content);
141                }
142
143                // Handle tool calls if they exist
144                if let Some(tool_calls) = &message.tool_calls {
145                    for tool_call in tool_calls {
146                        print!("\n[Tool Call] {}(", tool_call.function.name);
147                        if let Some(obj) = tool_call.function.arguments.as_object() {
148                            for (key, value) in obj {
149                                print!("{}: {}, ", key, value);
150                            }
151                        }
152                        println!(")");
153                        io::stdout().flush()?;
154                    }
155                }
156            }
157            Ok(None) => {
158                data_agent
159                    .add_message(&conversation_id, "assistant", &buffer)
160                    .await;
161                println!("\n");
162                break;
163            }
164            Err(e) => {
165                eprintln!("\n❌ Error processing stream: {}", e);
166                break;
167            }
168        }
169    }
170
171    Ok(())
172}
Source

pub async fn process_csv( &self, csv_content: &str, ) -> Result<CsvAnalysisResult, KowalskiError>

Processes a CSV file

Examples found in repository?
examples/csv_analysis.rs (line 55)
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12    // Initialize logging
13    env_logger::init();
14
15    // Load configuration
16    let config = Config::default();
17    let mut data_agent = DataAgent::new(config).await?;
18
19    // Start a conversation
20    println!("📊 Starting CSV Analysis...");
21    let conversation_id = data_agent.start_conversation("llama3.2");
22    println!("Data Agent Conversation ID: {}", conversation_id);
23
24    // Set up the role for data analysis
25    let role = Role::new(
26        "Data Analysis Assistant",
27        "You are an expert at analyzing and interpreting data from CSV files.",
28    )
29    .with_audience(Audience::new(
30        "Data Scientist",
31        "You are speaking to a data scientist who needs detailed analysis.",
32    ))
33    .with_preset(Preset::new(
34        "Analysis",
35        "Provide comprehensive analysis with insights and recommendations.",
36    ));
37
38    // Sample CSV data for analysis
39    let csv_data = r#"name,age,city,salary,department
40John Doe,30,New York,75000,Engineering
41Jane Smith,28,San Francisco,85000,Marketing
42Bob Johnson,35,Chicago,65000,Sales
43Alice Brown,32,Boston,70000,Engineering
44Charlie Wilson,29,Seattle,80000,Engineering
45Diana Davis,31,Austin,72000,Marketing
46Eve Miller,27,Denver,68000,Sales
47Frank Garcia,33,Portland,75000,Engineering
48Grace Lee,26,Atlanta,65000,Marketing
49Henry Taylor,34,Dallas,78000,Engineering"#;
50
51    println!("\n📈 Processing CSV Data:");
52    println!("{}", csv_data);
53
54    // Process the CSV data
55    let analysis_result = data_agent.process_csv(csv_data).await?;
56
57    println!("\n📊 CSV Analysis Results:");
58    println!("Headers: {:?}", analysis_result.headers);
59    println!("Total Rows: {}", analysis_result.total_rows);
60    println!("Total Columns: {}", analysis_result.total_columns);
61    println!(
62        "Summary: {}",
63        serde_json::to_string_pretty(&analysis_result.summary)?
64    );
65
66    // Ask the agent to analyze the data
67    let analysis_prompt = format!(
68        "Please analyze this CSV data and provide insights:\n\n{}\n\nAnalysis results:\n{}",
69        csv_data,
70        serde_json::to_string_pretty(&analysis_result.summary)?
71    );
72
73    let mut response = data_agent
74        .chat_with_history(&conversation_id, &analysis_prompt, Some(role))
75        .await?;
76
77    println!("\n🤖 AI Analysis:");
78
79    // Process the streaming response
80    let mut buffer = String::new();
81    while let Some(chunk) = response.chunk().await? {
82        match data_agent
83            .process_stream_response(&conversation_id, &chunk)
84            .await
85        {
86            Ok(Some(message)) => {
87                // Print the content if it exists
88                if !message.content.is_empty() {
89                    print!("{}", message.content);
90                    io::stdout().flush()?;
91                    buffer.push_str(&message.content);
92                }
93
94                // Handle tool calls if they exist
95                if let Some(tool_calls) = &message.tool_calls {
96                    for tool_call in tool_calls {
97                        print!("\n[Tool Call] {}(", tool_call.function.name);
98                        if let Some(obj) = tool_call.function.arguments.as_object() {
99                            for (key, value) in obj {
100                                print!("{}: {}, ", key, value);
101                            }
102                        }
103                        println!(")");
104                        io::stdout().flush()?;
105                    }
106                }
107            }
108            Ok(None) => {
109                data_agent
110                    .add_message(&conversation_id, "assistant", &buffer)
111                    .await;
112                println!("\n✅ Analysis complete!\n");
113                break;
114            }
115            Err(e) => {
116                eprintln!("\n❌ Error processing stream: {}", e);
117                break;
118            }
119        }
120    }
121
122    // Ask a follow-up question about specific insights
123    let follow_up = "What are the key insights about salary distribution across departments?";
124    let mut follow_up_response = data_agent
125        .chat_with_history(&conversation_id, follow_up, None)
126        .await?;
127
128    println!("\n🔍 Follow-up Analysis:");
129    let mut buffer = String::new();
130    while let Some(chunk) = follow_up_response.chunk().await? {
131        match data_agent
132            .process_stream_response(&conversation_id, &chunk)
133            .await
134        {
135            Ok(Some(message)) => {
136                // Print the content if it exists
137                if !message.content.is_empty() {
138                    print!("{}", message.content);
139                    io::stdout().flush()?;
140                    buffer.push_str(&message.content);
141                }
142
143                // Handle tool calls if they exist
144                if let Some(tool_calls) = &message.tool_calls {
145                    for tool_call in tool_calls {
146                        print!("\n[Tool Call] {}(", tool_call.function.name);
147                        if let Some(obj) = tool_call.function.arguments.as_object() {
148                            for (key, value) in obj {
149                                print!("{}: {}, ", key, value);
150                            }
151                        }
152                        println!(")");
153                        io::stdout().flush()?;
154                    }
155                }
156            }
157            Ok(None) => {
158                data_agent
159                    .add_message(&conversation_id, "assistant", &buffer)
160                    .await;
161                println!("\n");
162                break;
163            }
164            Err(e) => {
165                eprintln!("\n❌ Error processing stream: {}", e);
166                break;
167            }
168        }
169    }
170
171    Ok(())
172}
Source

pub async fn analyze_data( &self, csv_content: &str, ) -> Result<Value, KowalskiError>

Analyzes data statistics

Trait Implementations§

Source§

impl Agent for DataAgent

Source§

fn new<'async_trait>( config: Config, ) -> Pin<Box<dyn Future<Output = Result<Self, KowalskiError>> + Send + 'async_trait>>
where Self: 'async_trait,

Creates a new agent with the specified configuration.
Source§

fn start_conversation(&mut self, model: &str) -> String

Starts a new conversation
Source§

fn get_conversation(&self, id: &str) -> Option<&Conversation>

Gets a conversation by ID
Source§

fn list_conversations(&self) -> Vec<&Conversation>

Lists all conversations
Source§

fn delete_conversation(&mut self, id: &str) -> bool

Deletes a conversation
Source§

fn chat_with_history<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, conversation_id: &'life1 str, content: &'life2 str, role: Option<Role>, ) -> Pin<Box<dyn Future<Output = Result<Response, KowalskiError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Chats with history
Source§

fn process_stream_response<'life0, 'life1, 'life2, 'async_trait>( &'life0 mut self, conversation_id: &'life1 str, chunk: &'life2 [u8], ) -> Pin<Box<dyn Future<Output = Result<Option<Message>, KowalskiError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Processes a stream response
Source§

fn add_message<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 mut self, conversation_id: &'life1 str, role: &'life2 str, content: &'life3 str, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Adds a message to a conversation
Source§

fn name(&self) -> &str

Gets the agent’s name
Source§

fn description(&self) -> &str

Gets the agent’s description

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T