use crate::agents::agent::AgentGPT;
use crate::common::utils::{ClientType, Message, Status, Task};
use crate::traits::agent::Agent;
use crate::traits::functions::{AsyncFunctions, Functions};
use anyhow::{Result, anyhow};
use colored::*;
use nylas::client::Nylas;
use nylas::messages::Message as NylasMessage;
use std::borrow::Cow;
use std::env::var;
use tracing::{debug, info};
#[cfg(feature = "mem")]
use {
crate::common::memory::load_long_term_memory, crate::common::memory::long_term_memory_context,
crate::common::memory::save_long_term_memory,
};
#[cfg(feature = "oai")]
use {openai_dive::v1::models::FlagshipModel, openai_dive::v1::resources::chat::*};
#[cfg(feature = "cld")]
use anthropic_ai_sdk::types::message::{
ContentBlock, CreateMessageParams, Message as AnthMessage, MessageClient,
RequiredMessageParams, Role,
};
#[cfg(feature = "gem")]
use gems::{
chat::ChatBuilder,
messages::{Content, Message as GemMessage},
traits::CTrait,
};
#[cfg(any(
feature = "co",
feature = "oai",
feature = "gem",
feature = "cld",
feature = "xai"
))]
use crate::traits::functions::ReqResponse;
#[cfg(feature = "xai")]
use x_ai::{
chat_compl::{ChatCompletionsRequestBuilder, Message as XaiMessage},
traits::ChatCompletionsFetcher,
};
use async_trait::async_trait;
pub struct MailerGPT {
agent: AgentGPT,
nylas_client: Nylas,
client: ClientType,
}
impl MailerGPT {
pub async fn new(persona: &'static str, behavior: &'static str) -> Self {
let mut agent: AgentGPT = AgentGPT::new_borrowed(persona, behavior);
agent.id = agent.persona().to_string().into();
let client_id = var("NYLAS_CLIENT_ID").unwrap_or_default().to_owned();
let client_secret = var("NYLAS_CLIENT_SECRET").unwrap_or_default().to_owned();
let access_token = var("NYLAS_ACCESS_TOKEN").unwrap_or_default().to_owned();
let nylas_client = Nylas::new(&client_id, &client_secret, Some(&access_token))
.await
.unwrap();
let client = ClientType::from_env();
info!(
"{}",
format!("[*] {:?}: 🛠️ Getting ready!", agent.persona(),)
.bright_white()
.bold()
);
Self {
agent,
nylas_client,
client,
}
}
pub async fn get_latest_emails(&mut self) -> Result<Vec<NylasMessage>> {
let messages = self.nylas_client.messages().all().await.unwrap();
info!(
"[*] {:?}: Read {:?} Messages",
self.agent.persona(),
messages.len()
);
Ok(messages[95..].to_vec())
}
pub async fn generate_text_from_emails(&mut self, prompt: &str) -> Result<String> {
self.agent.add_message(Message {
role: Cow::Borrowed("user"),
content: Cow::Owned(format!(
"Requested to generate text based on emails with prompt: '{prompt}'"
)),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("user"),
content: Cow::Owned(format!(
"Requested to generate text based on emails with prompt: '{prompt}'"
)),
})
.await;
}
let emails = match self.get_latest_emails().await {
Ok(e) => e,
Err(err) => {
let error_msg = format!("Failed to fetch latest emails: {err}");
self.agent.add_message(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
})
.await;
}
return Err(anyhow!(error_msg));
}
};
self.agent.add_message(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(
"Analyzing latest emails and generating text based on provided prompt..."
.to_string(),
),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(
"Analyzing latest emails and generating text based on provided prompt..."
.to_string(),
),
})
.await;
}
let gemini_response = match &mut self.client {
#[cfg(feature = "gem")]
ClientType::Gemini(gem_client) => {
let parameters = ChatBuilder::default()
.messages(vec![GemMessage::User {
content: Content::Text(format!(
"User Request:{prompt}\n\nEmails:{emails:?}"
)),
name: None,
}])
.build()?;
let result = gem_client.chat().generate(parameters).await;
match result {
Ok(response) => response,
Err(err) => {
let error_msg = format!("Failed to generate content from emails: {err}");
self.agent.add_message(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
})
.await;
}
return Err(anyhow!(error_msg));
}
}
}
#[cfg(feature = "oai")]
ClientType::OpenAI(oai_client) => {
let parameters = ChatCompletionParametersBuilder::default()
.model(FlagshipModel::Gpt4O.to_string())
.messages(vec![ChatMessage::User {
content: ChatMessageContent::Text(format!(
"User Request:{prompt}\n\nEmails:{emails:?}"
)),
name: None,
}])
.response_format(ChatCompletionResponseFormat::Text)
.build()?;
let result = oai_client.chat().create(parameters).await;
match result {
Ok(chat_response) => {
let message = &chat_response.choices[0].message;
match message {
ChatMessage::Assistant {
content: Some(chat_content),
..
} => chat_content.to_string(),
ChatMessage::User { content, .. } => content.to_string(),
ChatMessage::System { content, .. } => content.to_string(),
ChatMessage::Developer { content, .. } => content.to_string(),
ChatMessage::Tool { content, .. } => content.clone(),
_ => String::from(""),
}
}
Err(err) => {
let error_msg = format!("Failed to generate content from emails: {err}");
self.agent.add_message(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
})
.await;
}
return Err(anyhow!(error_msg));
}
}
}
#[cfg(feature = "cld")]
ClientType::Anthropic(client) => {
let body = CreateMessageParams::new(RequiredMessageParams {
model: "claude-3-7-sonnet-latest".to_string(),
messages: vec![AnthMessage::new_text(
Role::User,
format!("User Request:{prompt}\n\nEmails:{emails:?}"),
)],
max_tokens: 1024,
});
match client.create_message(Some(&body)).await {
Ok(chat_response) => chat_response
.content
.iter()
.filter_map(|block| match block {
ContentBlock::Text { text, .. } => Some(text),
_ => None,
})
.cloned()
.collect::<Vec<_>>()
.join("\n"),
Err(err) => {
let error_msg =
format!("Failed to generate content from Claude API: {err}");
self.agent.add_message(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
})
.await;
}
return Err(anyhow!(error_msg));
}
}
}
#[cfg(feature = "xai")]
ClientType::Xai(xai_client) => {
let messages = vec![XaiMessage {
role: "user".into(),
content: format!("User Request:{prompt}\n\nEmails:{emails:?}"),
}];
let rb = ChatCompletionsRequestBuilder::new(
xai_client.clone(),
"grok-beta".into(),
messages,
)
.temperature(0.0)
.stream(false);
let req = rb.clone().build()?;
let resp = rb.create_chat_completion(req).await;
match resp {
Ok(chat) => {
let response_text = chat.choices[0].message.content.clone();
self.agent.add_message(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(response_text.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(response_text.clone()),
})
.await;
}
#[cfg(debug_assertions)]
debug!(
"[*] {:?}: Got XAI Output: {:?}",
self.agent.persona(),
response_text
);
response_text
}
Err(err) => {
let err_msg = format!("Failed to generate content from emails: {err}");
self.agent.add_message(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(err_msg.clone()),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(err_msg.clone()),
})
.await;
}
return Err(anyhow!(err_msg));
}
}
}
#[cfg(feature = "co")]
ClientType::Cohere(co_client) => {
use cohere_rust::api::GenerateModel;
use cohere_rust::api::generate::GenerateRequest;
let prompt_text = format!("User Request:{prompt}\n\nEmails:{emails:?}");
let gen_request = GenerateRequest {
prompt: &prompt_text,
model: Some(GenerateModel::Custom("command-a-03-2025".to_string())),
max_tokens: Some(2048),
..Default::default()
};
match co_client.generate(&gen_request).await {
Ok(generations) => generations
.iter()
.map(|g| g.text.as_str())
.collect::<Vec<_>>()
.join(""),
Err(e) => {
let error_msg = format!("Failed to generate content via Cohere API: {e:?}");
self.agent.add_message(Message {
role: Cow::Borrowed("system"),
content: Cow::Owned(error_msg.clone()),
});
return Err(anyhow!(error_msg));
}
}
}
#[allow(unreachable_patterns)]
_ => {
return Err(anyhow!(
"No valid AI client configured. Enable `co`, `gem`, `oai`, `cld`, or `xai` feature."
));
}
};
self.agent.add_message(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(
"Generated text from emails based on the given prompt.".to_string(),
),
});
#[cfg(feature = "mem")]
{
let _ = self
.save_ltm(Message {
role: Cow::Borrowed("assistant"),
content: Cow::Owned(
"Generated text from emails based on the given prompt.".to_string(),
),
})
.await;
}
info!(
"[*] {:?}: Got Response: {:?}",
self.agent.persona(),
gemini_response
);
Ok(gemini_response)
}
}
impl Functions for MailerGPT {
fn get_agent(&self) -> &AgentGPT {
&self.agent
}
}
#[async_trait]
impl AsyncFunctions for MailerGPT {
async fn execute<'a>(
&'a mut self,
task: &'a mut Task,
_execute: bool,
_browse: bool,
_max_tries: u64,
) -> Result<()> {
info!(
"{}",
format!("[*] {:?}: Executing task:", self.agent.persona(),)
.bright_white()
.bold()
);
for task in task.clone().description.clone().split("- ") {
if !task.trim().is_empty() {
info!("{} {}", "•".bright_white().bold(), task.trim().cyan());
}
}
let mut _count = 0;
while self.agent.status() != &Status::Completed {
match self.agent.status() {
Status::Idle => {
debug!("[*] {:?}: Idle", self.agent.persona());
let _generated_text = self.generate_text_from_emails(&task.description).await?;
_count += 1;
self.agent.update(Status::Completed);
}
_ => {
self.agent.update(Status::Completed);
}
}
}
Ok(())
}
#[cfg(any(
feature = "co",
feature = "oai",
feature = "gem",
feature = "cld",
feature = "xai"
))]
async fn generate(&mut self, _request: &str) -> Result<String> {
Ok("".to_string())
}
#[cfg(feature = "mem")]
async fn save_ltm(&mut self, message: Message) -> Result<()> {
save_long_term_memory(&mut self.client, self.agent.id.clone(), message).await
}
#[cfg(feature = "mem")]
async fn get_ltm(&self) -> Result<Vec<Message>> {
load_long_term_memory(self.agent.id.clone()).await
}
#[cfg(feature = "mem")]
async fn ltm_context(&self) -> String {
long_term_memory_context(self.agent.id.clone()).await
}
#[cfg(any(
feature = "co",
feature = "oai",
feature = "gem",
feature = "cld",
feature = "xai"
))]
async fn imagen(&mut self, _request: &str) -> Result<Vec<u8>> {
Ok(Default::default())
}
#[cfg(any(
feature = "co",
feature = "oai",
feature = "gem",
feature = "cld",
feature = "xai"
))]
async fn stream(&mut self, _request: &str) -> Result<ReqResponse> {
Ok(ReqResponse(None))
}
}