#![doc = include_str!("../README.md")]
use flume::Receiver;
use reqwest::Client as ReqwestClient;
use scc::HashMap as ConcurrentHashMap;
use scc::hash_map::OccupiedEntry;
use std::fmt::{Debug, Formatter};
use std::result::Result;
use std::sync::Arc;
use crate::model::anchorage::{Options, NodeOptions, NodeManagerOptions, PlayerOptions, ConnectionOptions};
use crate::model::error::AnchorageError;
use crate::model::player::EventType;
use crate::node::client::Node;
use crate::player::Player;
pub mod model;
pub mod node;
pub mod player;
pub struct Anchorage {
pub user_agent: String,
pub reconnect_tries: u16,
pub nodes: Arc<ConcurrentHashMap<String, Node>>,
pub(crate) request: ReqwestClient,
}
impl Debug for Anchorage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LavalinkClient")
.field("user_agent", &self.user_agent)
.field("reconnect_tries", &self.reconnect_tries)
.field("nodes", &self.nodes.len())
.finish()
}
}
impl Anchorage {
pub fn new(mut options: Options) -> Self {
Self {
user_agent: options.user_agent.unwrap_or(format!("Anchorage/{}", env!("CARGO_PKG_VERSION"))),
reconnect_tries: options.reconnect_tries.unwrap_or(u16::MAX),
request: options
.request
.get_or_insert_with(ReqwestClient::new)
.to_owned(),
nodes: Arc::new(ConcurrentHashMap::new())
}
}
#[tracing::instrument(skip(self, nodes_data))]
pub async fn start(
&self,
user_id: u64,
nodes_data: Vec<impl Into<NodeOptions>>,
) -> Result<(), AnchorageError> {
tracing::info!(
"Starting Lavalink with user_id ({}) and {} node(s)",
user_id,
nodes_data.len()
);
for data in nodes_data {
let info = data.into();
let (node, handle) = Node::new(NodeManagerOptions {
name: &info.name,
host: &info.host,
port: info.port,
auth: &info.auth,
id: user_id,
request: self.request.clone(),
user_agent: &self.user_agent,
reconnect_tries: self.reconnect_tries,
})
.await?;
self.nodes.insert_async(info.name, node).await.ok();
let nodes = self.nodes.clone();
tokio::spawn(async move {
let Ok(name) = handle.await else {
return;
};
let _ = nodes.remove_async(&name).await;
});
}
Ok(())
}
pub async fn get_ideal_node(&self) -> Result<Node, AnchorageError> {
let mut nodes = vec![];
self.nodes
.iter_async(|_, node| {
nodes.push(node.clone());
false
})
.await;
let mut penalties: f64 = 0.0;
let mut selected_node: Option<Node> = None;
for node in nodes {
let data = node.data().await?;
if selected_node.is_none() {
selected_node = Some(node);
continue;
}
if penalties > data.penalties {
selected_node = Some(node);
}
penalties = data.penalties;
}
match selected_node {
Some(node) => Ok(node),
None => Err(AnchorageError::NoNodesAvailable),
}
}
pub async fn get_node_for_player(&self, guild_id: u64) -> Option<OccupiedEntry<String, Node>> {
self.nodes
.any_async(|_, node| node.events_sender.contains_sync(&guild_id))
.await
}
pub async fn create_player(
&self,
guild_id: u64,
node: Node,
connection: impl Into<ConnectionOptions>,
) -> Result<(Player, Receiver<EventType>), AnchorageError> {
if self.get_node_for_player(guild_id).await.is_some() {
return Err(AnchorageError::CreateExistingPlayer);
}
let (player, events_sender, events_receiver) = Player::new(PlayerOptions {
node: node.clone(),
guild_id,
connection: connection.into(),
})
.await?;
let _ = node
.events_sender
.insert_async(guild_id, events_sender)
.await;
Ok((player, events_receiver))
}
pub async fn destroy_player(&self, guild_id: u64) -> Result<(), AnchorageError> {
let Some(node) = self.get_node_for_player(guild_id).await else {
return Ok(());
};
node.rest.destroy_player(guild_id).await?;
if let Some(sender) = node.events_sender.get_async(&guild_id).await {
sender.send_async(EventType::Destroyed).await.ok();
}
node.events_sender.remove_async(&guild_id).await;
Ok(())
}
pub async fn connect(&self, name: &str) -> Result<(), AnchorageError> {
if let Some(mut data) = self.nodes.get_async(name).await {
let node = data.get_mut();
node.connect().await?;
}
Ok(())
}
pub async fn disconnect(&self, name: &str, destroy: bool) -> Result<(), AnchorageError> {
if let Some(mut data) = self.nodes.get_async(name).await {
let node = data.get_mut();
node.disconnect().await?;
if destroy {
node.destroy().await?;
self.nodes.remove_async(name).await;
}
}
Ok(())
}
}