use crate::config::get_cache_dir;
use crate::log::StyledText;
use crate::pyproject::{Adapter, NbTomlEditor, PyProjectConfig};
use crate::utils::terminal_utils;
use crate::uv;
use anyhow::{Context, Result};
use clap::Subcommand;
use dialoguer::theme::ColorfulTheme;
use dialoguer::{Confirm, MultiSelect};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryAdapter {
pub module_name: String,
pub project_link: String,
pub name: String,
pub desc: String,
pub author: String,
pub homepage: Option<String>,
pub tags: Vec<HashMap<String, String>>,
pub is_official: bool,
pub time: String,
pub version: String,
}
impl From<&RegistryAdapter> for Adapter {
fn from(adapter: &RegistryAdapter) -> Self {
Self {
name: adapter.name.clone(),
module_name: adapter.module_name.clone(),
}
}
}
pub struct AdapterManager {
client: Client,
work_dir: PathBuf,
registry_adapters: OnceLock<HashMap<String, RegistryAdapter>>,
installed_adapters: OnceLock<Vec<Adapter>>,
}
impl Default for AdapterManager {
fn default() -> Self {
Self::new(None).unwrap()
}
}
impl AdapterManager {
pub fn new(work_dir: Option<PathBuf>) -> Result<Self> {
let work_dir = work_dir.unwrap_or_else(|| Path::new(".").to_path_buf());
let client = Client::builder()
.timeout(Duration::from_secs(15))
.user_agent("nbr")
.build()?;
Ok(Self {
client,
work_dir,
registry_adapters: OnceLock::new(),
installed_adapters: OnceLock::new(),
})
}
fn get_cache_file(&self) -> Result<PathBuf> {
let cache_dir = get_cache_dir()?;
Ok(cache_dir.join("adapters.json"))
}
fn set_registry_adapters(&self, adapters: HashMap<String, RegistryAdapter>) -> Result<()> {
self.registry_adapters
.set(adapters)
.map_err(|_| anyhow::anyhow!("Failed to set cached adapters"))
}
pub fn get_registry_adapters(&self) -> Result<&HashMap<String, RegistryAdapter>> {
self.registry_adapters
.get()
.context("Registry adapters not initialized")
}
pub async fn fetch_registry_adapters(
&self,
fetch_remote: bool,
) -> Result<&HashMap<String, RegistryAdapter>> {
if let Some(adapters) = self.registry_adapters.get() {
return Ok(adapters);
}
let cache_file = self.get_cache_file()?;
if !fetch_remote && cache_file.exists() {
debug!("Loading adapters from cache: {}", cache_file.display());
let registry_adapters: HashMap<String, RegistryAdapter> =
serde_json::from_slice(&std::fs::read(&cache_file)?)?;
self.set_registry_adapters(registry_adapters)?;
return self.get_registry_adapters();
}
let spinner = terminal_utils::create_spinner("Fetching adapters from registry...");
let adapters_json_url = "https://registry.nonebot.dev/adapters.json";
let response = self.client.get(adapters_json_url).send().await?;
let adapters: Vec<RegistryAdapter> = response
.json()
.await
.context("Failed to parse adapter info")?;
spinner.finish_and_clear();
let registry_adapters = adapters
.iter()
.map(|a| (a.name.to_owned(), a.clone()))
.collect::<HashMap<String, RegistryAdapter>>();
std::fs::write(cache_file, serde_json::to_string(®istry_adapters)?)?;
self.set_registry_adapters(registry_adapters)?;
self.get_registry_adapters()
}
pub fn parse_installed_adapters(&self) -> Option<&Vec<Adapter>> {
if let Some(adapters) = self.installed_adapters.get() {
return Some(adapters);
}
let config = PyProjectConfig::parse(Some(&self.work_dir)).ok()?;
let adapters = config.nonebot()?.adapters.to_owned()?;
self.installed_adapters.set(adapters).ok()?;
self.installed_adapters.get()
}
pub fn get_installed_adapters_names(&self) -> Vec<&str> {
let installed_adapters = self.parse_installed_adapters();
if let Some(adapters) = installed_adapters {
adapters
.iter()
.map(|a| a.name.as_str())
.collect::<Vec<&str>>()
} else {
vec![]
}
}
pub async fn select_adapters(
&self,
fetch_remote: bool,
filter_installed: bool,
) -> Result<Vec<&RegistryAdapter>> {
let registry_adapters = self.fetch_registry_adapters(fetch_remote).await?;
let mut adapter_names: Vec<String> = registry_adapters.keys().cloned().collect();
if filter_installed {
let installed_adapters = self.get_installed_adapters_names();
adapter_names.retain(|name| !installed_adapters.contains(&name.as_str()));
}
adapter_names.sort();
let selected_adapters = if !adapter_names.is_empty() {
let selections = MultiSelect::with_theme(&ColorfulTheme::default())
.with_prompt("Which adapter(s) would you like to use")
.items(&adapter_names)
.interact()?;
selections
.into_iter()
.map(|i| adapter_names[i].to_string())
.collect()
} else {
vec!["OneBot V11".to_string()] };
Ok(selected_adapters
.iter()
.filter_map(|name| registry_adapters.get(name))
.collect())
}
pub async fn install_adapters(&self, fetch_remote: bool) -> Result<()> {
let selected_adapters = self.select_adapters(fetch_remote, true).await?;
if selected_adapters.is_empty() {
warn!("You haven't selected any adapters to install");
return Ok(());
}
let selected_adapters_names = selected_adapters
.iter()
.map(|a| a.name.clone())
.collect::<Vec<String>>()
.join(", ");
let prompt = StyledText::new(" ")
.white_bold("Would you like to install")
.cyan_bold(format!("[{}]", selected_adapters_names).as_str())
.to_string();
if !Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(&prompt)
.default(true)
.interact()?
{
error!("{}", "Installation operation cancelled.");
return Ok(());
}
let adapter_packages = selected_adapters
.iter()
.map(|a| a.project_link.as_str())
.collect::<HashSet<&str>>() .into_iter()
.collect::<Vec<&str>>();
uv::add(adapter_packages)
.working_dir(&self.work_dir)
.run()?;
let adapters = selected_adapters
.iter()
.map(|a| (*a).into())
.collect::<Vec<Adapter>>();
NbTomlEditor::with_work_dir(Some(&self.work_dir))?.add_adapters(adapters)?;
StyledText::new(" ")
.green_bold("✓ Successfully installed adapters:")
.cyan_bold(&selected_adapters_names)
.println();
Ok(())
}
#[allow(dead_code)]
pub async fn get_installed_adapters_from_venv(&self) -> Result<HashSet<String>> {
let installed_adapters = uv::list(false).await?;
let installed_adapters_set = installed_adapters
.into_iter()
.filter(|a| a.name.contains("nonebot-adapter-"))
.map(|a| a.name)
.collect::<HashSet<String>>();
debug!("Installed adapters: {:?}", installed_adapters_set);
Ok(installed_adapters_set)
}
pub async fn uninstall_adapters(&self) -> Result<()> {
let mut installed_adapters = self.get_installed_adapters_names();
if installed_adapters.is_empty() {
warn!("You haven't installed any adapters");
return Ok(());
}
let selected_adapters: Vec<&str> = {
let selections = MultiSelect::with_theme(&ColorfulTheme::default())
.with_prompt("Select installed adapter(s) to uninstall")
.items(&installed_adapters)
.interact()?;
selections
.into_iter()
.map(|i| installed_adapters[i])
.collect()
};
NbTomlEditor::with_work_dir(Some(&self.work_dir))?
.remove_adapters(selected_adapters.to_vec())?;
let registry_adapters = self.fetch_registry_adapters(false).await?;
let mut adapter_packages = selected_adapters
.iter()
.filter_map(|name| {
registry_adapters
.get(*name)
.map(|a| a.project_link.as_str())
})
.collect::<HashSet<&str>>() .into_iter()
.collect::<Vec<&str>>();
installed_adapters.retain(|name| !selected_adapters.contains(name));
if installed_adapters
.iter()
.any(|name| name.starts_with("OneBot"))
{
adapter_packages.retain(|name| *name != "nonebot-adapter-onebot");
}
if !adapter_packages.is_empty() {
uv::remove(adapter_packages)
.working_dir(&self.work_dir)
.run()?;
}
StyledText::new(" ")
.green_bold("✓ Successfully uninstalled adapters:")
.cyan_bold(selected_adapters.join(", "))
.println();
Ok(())
}
pub async fn list_adapters(&self, show_all: bool) -> Result<()> {
let installed_adapters = self.get_installed_adapters_names();
let adapters_map = self.fetch_registry_adapters(show_all).await?;
if show_all {
info!("All Adapters:");
adapters_map.iter().for_each(|(_, adapter)| {
self.display_adapter(adapter);
});
} else {
if installed_adapters.is_empty() {
warn!("No adapters installed.");
return Ok(());
}
info!("Installed Adapters:");
installed_adapters.iter().for_each(|name| {
if let Some(adapter) = adapters_map.get(*name) {
self.display_adapter(adapter);
}
});
}
Ok(())
}
pub fn display_adapter(&self, adapter: &RegistryAdapter) {
StyledText::new(" ")
.cyan_bold(" •")
.cyan_bold(&adapter.name)
.text(format!("({})", adapter.project_link).as_str())
.green(format!("v{}", adapter.version).as_str())
.println();
}
#[allow(dead_code)]
fn display_adapter_info(&self, adapter: &RegistryAdapter) {
StyledText::new("").cyan_bold(&adapter.name).println();
StyledText::new(" ")
.text(" Package:")
.text(&adapter.project_link)
.println();
StyledText::new(" ")
.text(" Module:")
.text(&adapter.module_name)
.println();
StyledText::new(" ")
.text(" Desc:")
.text(&adapter.desc)
.println();
StyledText::new(" ")
.text(" Version:")
.text(&adapter.version)
.println();
StyledText::new(" ")
.text(" Author:")
.text(&adapter.author)
.println();
if let Some(ref homepage) = adapter.homepage {
StyledText::new(" ")
.text(" Homepage:")
.text(homepage)
.println();
}
}
}
#[derive(Subcommand)]
pub enum AdapterCommands {
#[clap(about = "Install adapters")]
Install {
#[clap(short, long, help = "Fetch adapters from remote")]
fetch_remote: bool,
},
#[clap(about = "Uninstall adapters")]
Uninstall,
#[clap(about = "List installed adapters, show all adapters if --all is set")]
List {
#[clap(short, long, help = "Show all adapters")]
all: bool,
},
}
pub async fn handle(commands: &AdapterCommands) -> Result<()> {
let adapter_manager = AdapterManager::new(None)?;
match commands {
AdapterCommands::Install { fetch_remote } => {
adapter_manager.install_adapters(*fetch_remote).await?
}
AdapterCommands::Uninstall => adapter_manager.uninstall_adapters().await?,
AdapterCommands::List { all } => adapter_manager.list_adapters(*all).await?,
}
Ok(())
}