use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use url::Url;
use crate::completable::Completable;
use crate::types::{Resource, ResourceContents};
use std::collections::HashSet;
use tokio::sync::broadcast;
pub type ListResourcesResult = Vec<Resource>;
pub type ReadResourceResult = Vec<ResourceContents>;
type ListResourcesFuture = Pin<Box<dyn Future<Output = ListResourcesResult> + Send + 'static>>;
type ReadResourceFuture = Pin<Box<dyn Future<Output = ReadResourceResult> + Send + 'static>>;
type ListResourcesFn = Box<dyn Fn() -> ListResourcesFuture + Send + Sync>;
type ReadResourceFn = Box<dyn Fn(&Url) -> ReadResourceFuture + Send + Sync>;
type ArcReadResourceCallback = Arc<dyn ReadResourceCallback>;
#[derive(Clone)]
pub struct ResourceUpdateChannel {
sender: broadcast::Sender<String>,
subscribed_uris: Arc<RwLock<HashSet<String>>>,
}
impl ResourceUpdateChannel {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(100);
Self {
sender,
subscribed_uris: Arc::new(RwLock::new(HashSet::new())),
}
}
pub fn subscribe(&self, uri: &Url) -> broadcast::Receiver<String> {
self.subscribed_uris.write().unwrap().insert(uri.to_string());
self.sender.subscribe()
}
pub fn unsubscribe(&self, uri: &Url) {
self.subscribed_uris.write().unwrap().remove(&uri.to_string());
}
pub fn notify_update(&self, uri: &Url) {
if self.subscribed_uris.read().unwrap().contains(&uri.to_string()) {
let _ = self.sender.send(uri.to_string());
}
}
}
impl Default for ResourceUpdateChannel {
fn default() -> Self {
Self::new()
}
}
pub struct ResourceTemplate {
uri_template: String,
list_callback: Option<Arc<dyn ListResourcesCallback>>,
complete_callbacks: HashMap<String, Arc<dyn Completable<Input = str, Output = String>>>,
}
impl ResourceTemplate {
pub fn new(uri_template: impl Into<String>) -> Self {
Self {
uri_template: uri_template.into(),
list_callback: None,
complete_callbacks: HashMap::new(),
}
}
pub fn with_list<F, Fut>(mut self, callback: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ListResourcesResult> + Send + 'static,
{
self.list_callback = Some(Arc::new(ListResourcesCallbackFn(Box::new(move || {
Box::pin(callback())
}))));
self
}
pub fn with_completion(
mut self,
variable: impl Into<String>,
completable: impl Completable<Input = str, Output = String> + 'static,
) -> Self {
self.complete_callbacks
.insert(variable.into(), Arc::new(completable));
self
}
pub fn uri_template(&self) -> &str {
&self.uri_template
}
pub fn list_callback(&self) -> Option<&dyn ListResourcesCallback> {
self.list_callback.as_deref()
}
pub fn complete_callback(
&self,
variable: &str,
) -> Option<&dyn Completable<Input = str, Output = String>> {
self.complete_callbacks.get(variable).map(|c| c.as_ref())
}
}
pub trait ListResourcesCallback: Send + Sync {
fn call(&self) -> ListResourcesFuture;
}
pub struct ListResourcesCallbackFn(ListResourcesFn);
impl ListResourcesCallback for ListResourcesCallbackFn {
fn call(&self) -> ListResourcesFuture {
(self.0)()
}
}
pub trait ReadResourceCallback: Send + Sync {
fn call(&self, uri: &Url) -> ReadResourceFuture;
}
pub struct ReadResourceCallbackFn(pub ReadResourceFn);
impl ReadResourceCallback for ReadResourceCallbackFn {
fn call(&self, uri: &Url) -> ReadResourceFuture {
(self.0)(uri)
}
}
pub(crate) struct RegisteredResource {
#[allow(dead_code)]
pub metadata: Resource,
#[allow(dead_code)]
pub read_callback: Arc<dyn ReadResourceCallback>,
#[allow(dead_code)]
pub update_channel: ResourceUpdateChannel,
#[allow(dead_code)]
pub supports_subscriptions: bool,
}
impl RegisteredResource {
pub fn new(
metadata: Resource,
read_callback: impl Fn(&Url) -> ReadResourceFuture + Send + Sync + 'static,
supports_subscriptions: bool,
) -> Self {
Self {
metadata,
read_callback: Arc::new(ReadResourceCallbackFn(Box::new(read_callback))),
update_channel: ResourceUpdateChannel::new(),
supports_subscriptions,
}
}
#[allow(dead_code)]
pub fn subscribe(&self) -> Option<broadcast::Receiver<String>> {
if self.supports_subscriptions {
Some(self.update_channel.subscribe(&self.metadata.uri))
} else {
None
}
}
#[allow(dead_code)]
pub fn unsubscribe(&self) {
if self.supports_subscriptions {
self.update_channel.unsubscribe(&self.metadata.uri);
}
}
#[allow(dead_code)]
pub fn notify_update(&self) {
if self.supports_subscriptions {
self.update_channel.notify_update(&self.metadata.uri);
}
}
}
pub(crate) struct RegisteredResourceTemplate {
#[allow(dead_code)]
pub template: ResourceTemplate,
#[allow(dead_code)]
pub metadata: Resource,
#[allow(dead_code)]
pub read_callback: ArcReadResourceCallback,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::completable::CompletableString;
#[tokio::test]
async fn test_resource_template() {
let template = ResourceTemplate::new("file://{path}")
.with_list(|| async { vec![] })
.with_completion(
"path",
CompletableString::new(|input: &str| {
let input = input.to_string();
async move { vec![format!("{}/file.txt", input)] }
}),
);
assert_eq!(template.uri_template(), "file://{path}");
assert!(template.list_callback().is_some());
assert!(template.complete_callback("path").is_some());
assert!(template.complete_callback("nonexistent").is_none());
}
}