opencloudmesh 0.2.1

Implementation of the OpenCloudMesh protocol
Documentation
// SPDX-FileCopyrightText: 2026 Matthias Kraus <info@opengeomesh.org>
//
// SPDX-License-Identifier: LGPL-3.0-or-later

use http::{
    StatusCode, Uri,
    uri::{Builder, InvalidUri},
};
use ocm_types::{
    common::OcmAddress,
    discovery::Discovery,
    error::{Error, ValidationError},
    notification::NewNotification,
    share::SendingServer,
};

use crate::{
    common::HttpClient,
    drivers::shares::{SentShareRepo, ReceivedShareRepo, ShareRepoError},
};

/// Uri of the notifications endpoint.
pub const NOTIFICATION_ENDPOINT: &str = "/notifications";

#[derive(Debug)]
/// Potential errors when sending or receiving notifications.
pub enum NotificationError {
    InvalidOcmEndpoint(InvalidUri),
    SerdeError(serde_json::Error),
    RequestError(String),
    Error(String),
}

impl From<serde_json::Error> for NotificationError {
    fn from(value: serde_json::Error) -> Self {
        Self::SerdeError(value)
    }
}

/// Send a notification to an OCM Server
///
/// * `ocm_server` OCM Server to which the notification is sent
/// * `notification` notification to send
pub async fn send_notification(
    client: &impl HttpClient,
    ocm_server: &Discovery,
    notification: NewNotification,
) -> Result<String, NotificationError> {
    let receiving_server_endpoint: Uri = ocm_server
        .end_point
        .as_str()
        .try_into()
        .map_err(NotificationError::InvalidOcmEndpoint)?;

    let path = receiving_server_endpoint
        .path()
        .strip_suffix("/")
        .unwrap_or(receiving_server_endpoint.path())
        .to_string();
    client
        .post(
            &Builder::from(receiving_server_endpoint)
                .path_and_query(path + NOTIFICATION_ENDPOINT)
                .build()
                .unwrap(),
            serde_json::to_value(notification)?,
        )
        .await
        .map_err(NotificationError::RequestError)
}

/// Receive a notification from an OCM Server
pub async fn receive_notification<SS: SentShareRepo, R: ReceivedShareRepo>(
    sent_shares: &SS,
    received_shares: &R,
    //FIXME this should be "validated notification sender"
    sending_server: SendingServer,
    notification: NewNotification,
) -> Result<(), (StatusCode, Error)> {
    // TODO check http-sig
    // TODO check if correct secret or token is included?
    // spec is unfortunately very thin regarding notifications
    // look at https://github.com/nextcloud/server/blob/master/apps/federatedfilesharing/lib/OCM/CloudFederationProviderFiles.php#L229
    // https://github.com/nextcloud/server/blob/master/apps/cloud_federation_api/lib/Controller/RequestHandlerController.php#L340
    // TODO document supported notifications (by moving them to an enum?)
    match notification.notification_type.as_str() {
        "SHARE_ACCEPTED" => {
            // TODO "Should this be reflected via states of a share? Can I depend on the SHARE_ACCEPTED notification to be sent at some point?")
            Ok(())
        }
        "SHARE_DECLINED" => sent_shares
            // FIXME logic error, sending server of notification is not sending server of share...
            .remove(&notification.provider_id)
            .await
            .map(|_| ())
            .map_err(|e| match e {
                ShareRepoError::NotFound => (
                    StatusCode::NOT_FOUND,
                    Error {
                        message: "Share not found".to_string(),
                        validation_errors: vec![],
                    },
                ),
            }),
        "SHARE_UNSHARED" => received_shares
            .remove(sending_server, &notification.provider_id)
            .await
            .map(|_| ())
            .map_err(|e| match e {
                ShareRepoError::NotFound => (
                    StatusCode::NOT_FOUND,
                    Error {
                        message: "Share not found".to_string(),
                        validation_errors: vec![],
                    },
                ),
            }),
        "USER_REMOVED" if &notification.resource_type == "user" => {
            let sender: OcmAddress = notification.provider_id.try_into().map_err(|e| {
                (
                    StatusCode::NOT_FOUND,
                    Error {
                        message: "Invalid Provider Id".to_string(),
                        validation_errors: vec![ValidationError{ name: None, message: Some(format!("In USER_REMOVED notifications the provider_id must be a valid OCM Address: {e}")) }],
                    },
                )
            })?;
            for (share, _) in received_shares
                .get_by_sender(&sender)
                .await
                .map_err(|e| match e {
                    ShareRepoError::NotFound => (
                        StatusCode::NOT_FOUND,
                        Error {
                            message: "Share not found".to_string(),
                            validation_errors: vec![],
                        },
                    ),
                })?
                // TODO switch to stream to delete multiple shares concurrently?
                // Add API to remove shares by sender to shareRepo?
                .iter()
            {
                sent_shares
                    .remove(&share.provider_id)
                    .await
                    .map_err(|_| {
                        (
                            StatusCode::INTERNAL_SERVER_ERROR,
                            Error {
                                message: "Failed to delete received shares from sender".to_string(),
                                validation_errors: vec![],
                            },
                        )
                    })?;
            }

            Ok(())
        }
        t => Err((
            StatusCode::NOT_ACCEPTABLE,
            Error {
                message: "Unknown notification type".to_string(),
                validation_errors: vec![ValidationError{ name: None, message: Some(t.to_string()) }],
            },
        )),
    }
}

#[cfg(test)]
mod tests {
    use http::Uri;
    use ocm_types::{
        discovery::Discovery,
        share::{AccessType, NewShare, WebDavPermissions},
    };
    use serde::Serialize;

    use crate::{discovery::discover, drivers::{resources::Resource, shares::{InMemorySentShareRepo, InMemoryShareRepo}}};

    use super::*;

    struct TestClient<R: ReceivedShareRepo, S: SentShareRepo> {
        pub received_shares: R,
        pub sent_shares: S,
    }

    #[derive(Debug, Clone, Serialize, Default)]
    struct TestResource();

    impl Resource for TestResource {
        const RESOURCE_TYPE: &str = "test-resource";

        fn uri(&self) -> &str {
            todo!()
        }

        fn name(&self) -> &str {
            todo!()
        }
    }

    impl TestClient<InMemoryShareRepo, InMemorySentShareRepo<TestResource>> {
        fn new() -> Self {
            Self {
                received_shares: InMemoryShareRepo::default(),
                sent_shares: InMemorySentShareRepo::<TestResource>::default(),
            }
        }
    }

    impl HttpClient for TestClient<InMemoryShareRepo, InMemorySentShareRepo<TestResource>> {
        async fn get(&self, url: &Uri) -> Result<String, String> {
            match url.to_string().as_str() {
                "https://test-receiver.example.org/.well-known/ocm" => {
                    Ok(serde_json::to_string(&Discovery {
                        enabled: true,
                        api_version: "1.2.0".to_string(),
                        end_point: "https://test-receiver.example.org".to_string(),
                        provider: Some("Test Receiver".to_string()),
                        ..Default::default()
                    })
                    .unwrap())
                }
                "https://test-provider.example.org/.well-known/ocm" => {
                    Ok(serde_json::to_string(&Discovery {
                        enabled: true,
                        api_version: "1.2.0".to_string(),
                        end_point: "https://test-provider.example.org".to_string(),
                        provider: Some("Test Provider".to_string()),
                        ..Default::default()
                    })
                    .unwrap())
                }
                _ => Err("NOT_FOUND".to_string()),
            }
        }

        async fn post(&self, url: &Uri, body: serde_json::Value) -> Result<String, String> {
            match url.to_string().as_str() {
                "https://test-receiver.example.org/notifications" => receive_notification(
                    &self.sent_shares,
                    &self.received_shares,
                    // FIXME How to get the sending server from notifications??
                    "test-provider.example.org".into(),
                    serde_json::from_value(body).unwrap(),
                )
                .await
                .map(|ok| serde_json::to_string(&ok).unwrap())
                .map_err(|(_code, err)| serde_json::to_string(&err).unwrap()),
                "https://test-provider.example.org/notifications" => receive_notification(
                    &self.sent_shares,
                    &self.received_shares,
                    // FIXME How to get the sending server from notifications??
                    "test-provider.example.org".into(),
                    serde_json::from_value(body).unwrap(),
                )
                .await
                .map(|ok| serde_json::to_string(&ok).unwrap())
                .map_err(|(_code, err)| serde_json::to_string(&err).unwrap()),
                _ => Err("NOT_FOUND".to_string()),
            }
        }

        fn allow_http(&self) -> bool {
            false
        }
    }

    #[tokio::test]
    async fn send_receive_share_declined() {
        let client = TestClient::new();

        let notification_share_declined = NewNotification {
            notification_type: "SHARE_DECLINED".to_string(),
            resource_type: "file".to_string(),
            provider_id: "test-share".to_string(),
            notification: None,
        };

        // check if notification SHARE_DECLINED failes if notification does not exist
        send_notification(
            &client,
            &discover(
                &client,
                &"test-provider.example.org".try_into().unwrap(),
            )
            .await
            .unwrap(),
            notification_share_declined.clone(),
        )
        .await
        .expect_err("There are no shares, SHARE_DECLINED notification should have failed!");

        let new_share = NewShare {
            share_with: "recipient@test-receiver.example.org".try_into().unwrap(),
            name: "Test Share".to_string(),
            description: None,
            provider_id: "test-share".to_string(),
            owner: "owner@test-provider.example.org".try_into().unwrap(),
            sender: "owner@test-provider.example.org".try_into().unwrap(),
            owner_display_name: None,
            sender_display_name: None,
            share_type: ocm_types::common::ShareType::User,
            resource_type: "file".to_string(),
            expiration: None,
            protocol: ocm_types::share::Protocol {
                name: "multi".to_string(),
                webdav: Some(ocm_types::share::WebDavProperties {
                    access_types: vec![AccessType::Remote],
                    uri: "example.org/".to_string(),
                    shared_secret: Some("password".to_string()),
                    refresh_token: None,
                    permissions: vec![WebDavPermissions::Read],
                    requirements: vec![],
                }),
                ..Default::default()
            },
        };

        client
            .sent_shares
            .insert(new_share, TestResource())
            .await
            .unwrap();
        send_notification(
            &client,
            &discover(
                &client,
                &"test-provider.example.org".try_into().unwrap(),
            )
            .await
            .unwrap(),
            notification_share_declined,
        )
        .await
        .expect("Notification should have been successful");
    }
}