use colored::*;
use std::io;
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs;
use futures::{stream, StreamExt};
use dialoguer::Confirm;
use indicatif::{ProgressBar, ProgressStyle};
use crate::zotero::collection::models::Collection;
use tracing::{event, info, span, Instrument, Level};
use crate::anythingllm::workspace::models::Workspace;
use crate::app::commands;
use crate::zotero::item::models::Item;
use crate::Config;
use eyre::eyre;
use std::sync::Arc;
use tokio::sync::Mutex;
pub async fn import_zotero(collection_name: String) -> eyre::Result<()> {
println!("Importing documents from '{}'", collection_name);
let collection = match get_collection(&collection_name).await {
Ok(collection) => collection,
Err(e) => {
return Err(e);
}
};
let pdfs = match get_pdfs_from_collection(&collection).await {
Ok(docs) => docs,
Err(e) => {
return Err(e);
}
};
let pdf_file_paths = file_paths(pdfs);
let workspace = match create_or_get_workspace("zotero", &collection_name).await {
Ok(workspace) => workspace,
Err(e) => {
return Err(e);
}
};
let uploaded_pdf_ids = match upload_pdfs(pdf_file_paths).await {
Ok(uploaded_pdf_ids) => uploaded_pdf_ids,
Err(e) => {
return Err(e);
}
};
if embed_pdfs(&workspace, uploaded_pdf_ids).await.is_err() {
return Err(eyre!("Error embedding PDFs"));
}
println!("done");
Ok(())
}
pub async fn import_folder(folder_path: PathBuf) -> eyre::Result<()> {
println!("Importing documents from {}", folder_path.display());
let pdfs = match get_pdf_filepaths_from_directory(&folder_path).await {
Ok(pdfs) => pdfs,
Err(e) => {
return Err(e);
}
};
let folder_name = folder_path.file_name().unwrap().to_string_lossy();
let workspace = match create_or_get_workspace("folder", &folder_name).await {
Ok(workspace) => workspace,
Err(e) => {
return Err(e);
}
};
let uploaded_pdf_ids = match upload_pdfs(pdfs).await {
Ok(uploaded_pdf_ids) => uploaded_pdf_ids,
Err(e) => {
return Err(e);
}
};
if embed_pdfs(&workspace, uploaded_pdf_ids).await.is_err() {
return Err(eyre!("Error embedding PDFs"));
}
println!("done");
Ok(())
}
pub async fn import_item() -> eyre::Result<()> {
Ok(())
}
#[allow(dead_code)]
struct Spinner {
message: String,
spinner: ProgressBar,
}
impl Spinner {
fn new(message: String) -> Spinner {
let spinner_style = ProgressStyle::default_spinner()
.template("{spinner:.green} {msg}")
.unwrap();
let spinner = ProgressBar::new_spinner()
.with_message(message.clone())
.with_style(spinner_style.clone());
spinner.enable_steady_tick(Duration::from_millis(100));
Spinner {
message: message.to_string(),
spinner,
}
}
fn finish_ok(&self, message: String) {
self.spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} {msg:.green}")
.unwrap(),
);
self.spinner.finish_with_message(message);
}
fn finish_error(&self, message: String) {
self.spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} {msg:.red}")
.unwrap(),
);
self.spinner.finish_with_message(message);
}
}
pub enum UpdateParameter {
Adds,
Deletes,
}
pub async fn get_collection(collection_name: &str) -> eyre::Result<Collection> {
let sp = Spinner::new("Checking collection...".to_string());
let zotero = commands::zotero_client();
let collection = zotero.collection_from_name(&collection_name).await;
match collection {
Ok(collection) => {
sp.finish_ok("Collection OK".to_string());
Ok(collection)
}
Err(_) => {
sp.finish_error("Collection not found. Run 'zotero list-collections'".to_string());
Err(eyre!("Error checking collection"))
}
}
}
pub async fn get_pdfs_from_collection(collection: &Collection) -> eyre::Result<Vec<Item>> {
let sp = Spinner::new("Checking PDFs...".to_string());
let zotero = commands::zotero_client();
let items: Vec<Item> = zotero
.get_collections_collection_key_items_batched(collection.clone().key)
.collect()
.await;
let pdfs: Vec<Item> = items.into_iter().filter(|item| item.is_pdf()).collect();
if !pdfs.is_empty() {
sp.finish_ok(format!("{} PDFS found", pdfs.len()));
Ok(pdfs)
} else {
sp.finish_error("No PDFS found in collection".to_string());
Err(eyre!("Error getting PDFs"))
}
}
async fn get_pdf_filepaths_from_directory(folder_path: &PathBuf) -> eyre::Result<Vec<PathBuf>> {
let sp = Spinner::new("Getting PDFs...".to_string());
match folder_path.exists() {
true => {
let mut pdf_file_paths = Vec::new();
let mut dir = fs::read_dir(folder_path).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.is_file() && path.extension().unwrap_or_default() == "pdf" {
pdf_file_paths.push(path);
}
}
if !pdf_file_paths.is_empty() {
sp.finish_ok(format!("{} PDFs found", pdf_file_paths.len()));
Ok(pdf_file_paths)
} else {
sp.finish_error("No PDFs found in folder".to_string());
Err(eyre!("No PDFs found in folder"))
}
}
false => {
sp.finish_error("Folder does not exist".to_string());
Err(eyre!("Folder does not exist"))
}
}
}
async fn create_or_get_workspace(prefix: &str, collection_name: &str) -> eyre::Result<Workspace> {
let sp = Spinner::new("Checking workspace...".to_string());
let anythingllm = commands::anythingllm_client();
let workspace_name = format!("{}-{}", prefix, collection_name);
match anythingllm.get_workspace_by_name(&workspace_name).await {
Ok(workspace) => {
sp.finish_error("Workspace exists".to_string());
let confirmation = Confirm::new()
.with_prompt(" Do you want to continue?")
.interact()
.expect("Error getting confirmation");
if !confirmation {
sp.finish_error("Cancelled".to_string());
return Err(eyre!("Error creating workspace"));
}
let _ = anythingllm.delete_workspace_slug(&workspace.slug).await;
}
Err(_) => {}
}
match anythingllm.create_workspace(&workspace_name).await {
Ok(workspace) => {
sp.finish_ok(format!("Created workspace {}", workspace.name));
Ok(workspace)
}
Err(_) => {
sp.finish_error("Error creating workspace".to_string());
Err(eyre!("Error creating workspace"))
}
}
}
async fn upload_pdfs(file_paths: Vec<PathBuf>) -> eyre::Result<Vec<String>> {
let mut failures = Vec::<PathBuf>::new();
let doc_count = file_paths.len();
let bar = ProgressBar::new(doc_count as u64);
let bar_style = ProgressStyle::default_bar()
.template("{bar:100.cyan/blue} {pos:>7}/{len:7} {msg} {eta}")
.unwrap();
bar.set_style(bar_style.progress_chars("##-"));
let anythingllm = Arc::new(commands::anythingllm_client());
let failed_docs = Arc::new(Mutex::new(Vec::<PathBuf>::new()));
let docs: Vec<_> = stream::iter(file_paths)
.map(|document_filepath| {
let span = span!(Level::INFO, "process PDF");
let anythingllm = anythingllm.clone();
let failed_docs = failed_docs.clone();
let bar = bar.clone();
async move {
event!(Level::INFO, "Uploading");
match anythingllm.post_document_upload(&document_filepath).await {
Ok(doc) => {
event!(Level::INFO, "upload success");
bar.inc(1);
doc.location
}
Err(_) => {
let mut failed_docs = failed_docs.lock().await;
failed_docs.push(document_filepath.clone());
event!(
Level::INFO,
"upload fail: {}",
document_filepath.as_path().display(),
);
None
}
}
}
.instrument(span)
})
.buffered(100)
.filter_map(|f| async { f })
.collect()
.await;
bar.finish();
let failed_docs_mutex_guard = failed_docs.lock().await;
failures.append(&mut failed_docs_mutex_guard.clone());
if failures.is_empty() {
println!("{}", " All documents uploaded successfully.".green());
} else {
let dirs = directories_next::ProjectDirs::from("com", "richardlyon", "aza").unwrap();
let filename = format!(
"log_{}.txt",
chrono::Local::now().format("%Y-%m-%d_%H-%M-%S")
);
let log_path = dirs.config_dir().join(filename);
let mut log_file = std::fs::File::create(&log_path).unwrap();
for doc in failures.iter() {
writeln!(log_file, "{}", doc.display()).unwrap();
}
let message = format!(
" {} document(s) failed to upload. See {} for details.",
failures.len(),
log_path.display()
);
println!("{}", message.red());
}
Ok(docs)
}
async fn embed_pdfs(workspace: &Workspace, pdfs: Vec<String>) -> eyre::Result<()> {
let sp = Spinner::new("Embedding PDFs in workspace...".to_string());
let anythingllm = commands::anythingllm_client();
match anythingllm
.update_embeddings(&workspace.slug, pdfs, UpdateParameter::Adds)
.await
{
Ok(_) => {
sp.finish_ok("Embedding OK".to_string());
Ok(())
}
Err(_) => {
sp.finish_error("Error embedding".to_string());
Err(eyre!("Error embedding"))
}
}
}
pub fn file_paths(pdfs: Vec<Item>) -> Vec<PathBuf> {
let config = Config::from_file().unwrap();
let zotero_library_root_path = PathBuf::from(config.zotero_library_root_path);
let file_paths = pdfs
.iter()
.map(|pdf| pdf.filepath(&zotero_library_root_path).unwrap())
.collect::<Vec<PathBuf>>();
file_paths
}