use http::{
StatusCode, Uri,
uri::{Builder, InvalidUri},
};
use ocm_types::{
common::OcmAddress,
discovery::Discovery,
error::{Error, ValidationError},
notification::NewNotification,
share::SendingServer,
};
use crate::{
common::HttpClient,
drivers::shares::{SentShareRepo, ReceivedShareRepo, ShareRepoError},
};
pub const NOTIFICATION_ENDPOINT: &str = "/notifications";
#[derive(Debug)]
pub enum NotificationError {
InvalidOcmEndpoint(InvalidUri),
SerdeError(serde_json::Error),
RequestError(String),
Error(String),
}
impl From<serde_json::Error> for NotificationError {
fn from(value: serde_json::Error) -> Self {
Self::SerdeError(value)
}
}
pub async fn send_notification(
client: &impl HttpClient,
ocm_server: &Discovery,
notification: NewNotification,
) -> Result<String, NotificationError> {
let receiving_server_endpoint: Uri = ocm_server
.end_point
.as_str()
.try_into()
.map_err(NotificationError::InvalidOcmEndpoint)?;
let path = receiving_server_endpoint
.path()
.strip_suffix("/")
.unwrap_or(receiving_server_endpoint.path())
.to_string();
client
.post(
&Builder::from(receiving_server_endpoint)
.path_and_query(path + NOTIFICATION_ENDPOINT)
.build()
.unwrap(),
serde_json::to_value(notification)?,
)
.await
.map_err(NotificationError::RequestError)
}
pub async fn receive_notification<SS: SentShareRepo, R: ReceivedShareRepo>(
sent_shares: &SS,
received_shares: &R,
sending_server: SendingServer,
notification: NewNotification,
) -> Result<(), (StatusCode, Error)> {
match notification.notification_type.as_str() {
"SHARE_ACCEPTED" => {
Ok(())
}
"SHARE_DECLINED" => sent_shares
.remove(¬ification.provider_id)
.await
.map(|_| ())
.map_err(|e| match e {
ShareRepoError::NotFound => (
StatusCode::NOT_FOUND,
Error {
message: "Share not found".to_string(),
validation_errors: vec![],
},
),
}),
"SHARE_UNSHARED" => received_shares
.remove(sending_server, ¬ification.provider_id)
.await
.map(|_| ())
.map_err(|e| match e {
ShareRepoError::NotFound => (
StatusCode::NOT_FOUND,
Error {
message: "Share not found".to_string(),
validation_errors: vec![],
},
),
}),
"USER_REMOVED" if ¬ification.resource_type == "user" => {
let sender: OcmAddress = notification.provider_id.try_into().map_err(|e| {
(
StatusCode::NOT_FOUND,
Error {
message: "Invalid Provider Id".to_string(),
validation_errors: vec![ValidationError{ name: None, message: Some(format!("In USER_REMOVED notifications the provider_id must be a valid OCM Address: {e}")) }],
},
)
})?;
for (share, _) in received_shares
.get_by_sender(&sender)
.await
.map_err(|e| match e {
ShareRepoError::NotFound => (
StatusCode::NOT_FOUND,
Error {
message: "Share not found".to_string(),
validation_errors: vec![],
},
),
})?
.iter()
{
sent_shares
.remove(&share.provider_id)
.await
.map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Error {
message: "Failed to delete received shares from sender".to_string(),
validation_errors: vec![],
},
)
})?;
}
Ok(())
}
t => Err((
StatusCode::NOT_ACCEPTABLE,
Error {
message: "Unknown notification type".to_string(),
validation_errors: vec![ValidationError{ name: None, message: Some(t.to_string()) }],
},
)),
}
}
#[cfg(test)]
mod tests {
use http::Uri;
use ocm_types::{
discovery::Discovery,
share::{AccessType, NewShare, WebDavPermissions},
};
use serde::Serialize;
use crate::{discovery::discover, drivers::{resources::Resource, shares::{InMemorySentShareRepo, InMemoryShareRepo}}};
use super::*;
struct TestClient<R: ReceivedShareRepo, S: SentShareRepo> {
pub received_shares: R,
pub sent_shares: S,
}
#[derive(Debug, Clone, Serialize, Default)]
struct TestResource();
impl Resource for TestResource {
const RESOURCE_TYPE: &str = "test-resource";
fn uri(&self) -> &str {
todo!()
}
fn name(&self) -> &str {
todo!()
}
}
impl TestClient<InMemoryShareRepo, InMemorySentShareRepo<TestResource>> {
fn new() -> Self {
Self {
received_shares: InMemoryShareRepo::default(),
sent_shares: InMemorySentShareRepo::<TestResource>::default(),
}
}
}
impl HttpClient for TestClient<InMemoryShareRepo, InMemorySentShareRepo<TestResource>> {
async fn get(&self, url: &Uri) -> Result<String, String> {
match url.to_string().as_str() {
"https://test-receiver.example.org/.well-known/ocm" => {
Ok(serde_json::to_string(&Discovery {
enabled: true,
api_version: "1.2.0".to_string(),
end_point: "https://test-receiver.example.org".to_string(),
provider: Some("Test Receiver".to_string()),
..Default::default()
})
.unwrap())
}
"https://test-provider.example.org/.well-known/ocm" => {
Ok(serde_json::to_string(&Discovery {
enabled: true,
api_version: "1.2.0".to_string(),
end_point: "https://test-provider.example.org".to_string(),
provider: Some("Test Provider".to_string()),
..Default::default()
})
.unwrap())
}
_ => Err("NOT_FOUND".to_string()),
}
}
async fn post(&self, url: &Uri, body: serde_json::Value) -> Result<String, String> {
match url.to_string().as_str() {
"https://test-receiver.example.org/notifications" => receive_notification(
&self.sent_shares,
&self.received_shares,
"test-provider.example.org".into(),
serde_json::from_value(body).unwrap(),
)
.await
.map(|ok| serde_json::to_string(&ok).unwrap())
.map_err(|(_code, err)| serde_json::to_string(&err).unwrap()),
"https://test-provider.example.org/notifications" => receive_notification(
&self.sent_shares,
&self.received_shares,
"test-provider.example.org".into(),
serde_json::from_value(body).unwrap(),
)
.await
.map(|ok| serde_json::to_string(&ok).unwrap())
.map_err(|(_code, err)| serde_json::to_string(&err).unwrap()),
_ => Err("NOT_FOUND".to_string()),
}
}
fn allow_http(&self) -> bool {
false
}
}
#[tokio::test]
async fn send_receive_share_declined() {
let client = TestClient::new();
let notification_share_declined = NewNotification {
notification_type: "SHARE_DECLINED".to_string(),
resource_type: "file".to_string(),
provider_id: "test-share".to_string(),
notification: None,
};
send_notification(
&client,
&discover(
&client,
&"test-provider.example.org".try_into().unwrap(),
)
.await
.unwrap(),
notification_share_declined.clone(),
)
.await
.expect_err("There are no shares, SHARE_DECLINED notification should have failed!");
let new_share = NewShare {
share_with: "recipient@test-receiver.example.org".try_into().unwrap(),
name: "Test Share".to_string(),
description: None,
provider_id: "test-share".to_string(),
owner: "owner@test-provider.example.org".try_into().unwrap(),
sender: "owner@test-provider.example.org".try_into().unwrap(),
owner_display_name: None,
sender_display_name: None,
share_type: ocm_types::common::ShareType::User,
resource_type: "file".to_string(),
expiration: None,
protocol: ocm_types::share::Protocol {
name: "multi".to_string(),
webdav: Some(ocm_types::share::WebDavProperties {
access_types: vec![AccessType::Remote],
uri: "example.org/".to_string(),
shared_secret: Some("password".to_string()),
refresh_token: None,
permissions: vec![WebDavPermissions::Read],
requirements: vec![],
}),
..Default::default()
},
};
client
.sent_shares
.insert(new_share, TestResource())
.await
.unwrap();
send_notification(
&client,
&discover(
&client,
&"test-provider.example.org".try_into().unwrap(),
)
.await
.unwrap(),
notification_share_declined,
)
.await
.expect("Notification should have been successful");
}
}