csv_analysis/
csv_analysis.rs

1use env_logger;
2use kowalski_core::{
3    agent::Agent,
4    config::Config,
5    role::{Audience, Preset, Role},
6};
7use kowalski_data_agent::agent::DataAgent;
8use std::io::{self, Write};
9
10#[tokio::main]
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12    // Initialize logging
13    env_logger::init();
14
15    // Load configuration
16    let config = Config::default();
17    let mut data_agent = DataAgent::new(config).await?;
18
19    // Start a conversation
20    println!("šŸ“Š Starting CSV Analysis...");
21    let conversation_id = data_agent.start_conversation("llama3.2");
22    println!("Data Agent Conversation ID: {}", conversation_id);
23
24    // Set up the role for data analysis
25    let role = Role::new(
26        "Data Analysis Assistant",
27        "You are an expert at analyzing and interpreting data from CSV files.",
28    )
29    .with_audience(Audience::new(
30        "Data Scientist",
31        "You are speaking to a data scientist who needs detailed analysis.",
32    ))
33    .with_preset(Preset::new(
34        "Analysis",
35        "Provide comprehensive analysis with insights and recommendations.",
36    ));
37
38    // Sample CSV data for analysis
39    let csv_data = r#"name,age,city,salary,department
40John Doe,30,New York,75000,Engineering
41Jane Smith,28,San Francisco,85000,Marketing
42Bob Johnson,35,Chicago,65000,Sales
43Alice Brown,32,Boston,70000,Engineering
44Charlie Wilson,29,Seattle,80000,Engineering
45Diana Davis,31,Austin,72000,Marketing
46Eve Miller,27,Denver,68000,Sales
47Frank Garcia,33,Portland,75000,Engineering
48Grace Lee,26,Atlanta,65000,Marketing
49Henry Taylor,34,Dallas,78000,Engineering"#;
50
51    println!("\nšŸ“ˆ Processing CSV Data:");
52    println!("{}", csv_data);
53
54    // Process the CSV data
55    let analysis_result = data_agent.process_csv(csv_data).await?;
56
57    println!("\nšŸ“Š CSV Analysis Results:");
58    println!("Headers: {:?}", analysis_result.headers);
59    println!("Total Rows: {}", analysis_result.total_rows);
60    println!("Total Columns: {}", analysis_result.total_columns);
61    println!(
62        "Summary: {}",
63        serde_json::to_string_pretty(&analysis_result.summary)?
64    );
65
66    // Ask the agent to analyze the data
67    let analysis_prompt = format!(
68        "Please analyze this CSV data and provide insights:\n\n{}\n\nAnalysis results:\n{}",
69        csv_data,
70        serde_json::to_string_pretty(&analysis_result.summary)?
71    );
72
73    let mut response = data_agent
74        .chat_with_history(&conversation_id, &analysis_prompt, Some(role))
75        .await?;
76
77    println!("\nšŸ¤– AI Analysis:");
78
79    // Process the streaming response
80    let mut buffer = String::new();
81    while let Some(chunk) = response.chunk().await? {
82        match data_agent
83            .process_stream_response(&conversation_id, &chunk)
84            .await
85        {
86            Ok(Some(message)) => {
87                // Print the content if it exists
88                if !message.content.is_empty() {
89                    print!("{}", message.content);
90                    io::stdout().flush()?;
91                    buffer.push_str(&message.content);
92                }
93
94                // Handle tool calls if they exist
95                if let Some(tool_calls) = &message.tool_calls {
96                    for tool_call in tool_calls {
97                        print!("\n[Tool Call] {}(", tool_call.function.name);
98                        if let Some(obj) = tool_call.function.arguments.as_object() {
99                            for (key, value) in obj {
100                                print!("{}: {}, ", key, value);
101                            }
102                        }
103                        println!(")");
104                        io::stdout().flush()?;
105                    }
106                }
107            }
108            Ok(None) => {
109                data_agent
110                    .add_message(&conversation_id, "assistant", &buffer)
111                    .await;
112                println!("\nāœ… Analysis complete!\n");
113                break;
114            }
115            Err(e) => {
116                eprintln!("\nāŒ Error processing stream: {}", e);
117                break;
118            }
119        }
120    }
121
122    // Ask a follow-up question about specific insights
123    let follow_up = "What are the key insights about salary distribution across departments?";
124    let mut follow_up_response = data_agent
125        .chat_with_history(&conversation_id, follow_up, None)
126        .await?;
127
128    println!("\nšŸ” Follow-up Analysis:");
129    let mut buffer = String::new();
130    while let Some(chunk) = follow_up_response.chunk().await? {
131        match data_agent
132            .process_stream_response(&conversation_id, &chunk)
133            .await
134        {
135            Ok(Some(message)) => {
136                // Print the content if it exists
137                if !message.content.is_empty() {
138                    print!("{}", message.content);
139                    io::stdout().flush()?;
140                    buffer.push_str(&message.content);
141                }
142
143                // Handle tool calls if they exist
144                if let Some(tool_calls) = &message.tool_calls {
145                    for tool_call in tool_calls {
146                        print!("\n[Tool Call] {}(", tool_call.function.name);
147                        if let Some(obj) = tool_call.function.arguments.as_object() {
148                            for (key, value) in obj {
149                                print!("{}: {}, ", key, value);
150                            }
151                        }
152                        println!(")");
153                        io::stdout().flush()?;
154                    }
155                }
156            }
157            Ok(None) => {
158                data_agent
159                    .add_message(&conversation_id, "assistant", &buffer)
160                    .await;
161                println!("\n");
162                break;
163            }
164            Err(e) => {
165                eprintln!("\nāŒ Error processing stream: {}", e);
166                break;
167            }
168        }
169    }
170
171    Ok(())
172}