use std::sync::Arc;
use async_trait::async_trait;
use crate::{
    core::usubscription::{
        usubscription_uri, FetchSubscribersRequest, FetchSubscribersResponse,
        FetchSubscriptionsRequest, FetchSubscriptionsResponse, NotificationsRequest,
        NotificationsResponse, SubscriptionRequest, SubscriptionResponse, USubscription,
        UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_FETCH_SUBSCRIBERS,
        RESOURCE_ID_FETCH_SUBSCRIPTIONS, RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS,
        RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE,
    },
    UStatus,
};
use super::{CallOptions, RpcClient};
pub struct RpcClientUSubscription {
    rpc_client: Arc<dyn RpcClient>,
}
impl RpcClientUSubscription {
    pub fn new(rpc_client: Arc<dyn RpcClient>) -> Self {
        RpcClientUSubscription { rpc_client }
    }
    fn default_call_options() -> CallOptions {
        CallOptions::for_rpc_request(5_000, None, None, None)
    }
}
#[async_trait]
impl USubscription for RpcClientUSubscription {
    async fn subscribe(
        &self,
        subscription_request: SubscriptionRequest,
    ) -> Result<SubscriptionResponse, UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, SubscriptionResponse>(
                usubscription_uri(RESOURCE_ID_SUBSCRIBE),
                Self::default_call_options(),
                subscription_request,
            )
            .await
            .map_err(UStatus::from)
    }
    async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, UnsubscribeResponse>(
                usubscription_uri(RESOURCE_ID_UNSUBSCRIBE),
                Self::default_call_options(),
                unsubscribe_request,
            )
            .await
            .map(|_response| ())
            .map_err(UStatus::from)
    }
    async fn fetch_subscriptions(
        &self,
        fetch_subscriptions_request: FetchSubscriptionsRequest,
    ) -> Result<FetchSubscriptionsResponse, UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, FetchSubscriptionsResponse>(
                usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS),
                Self::default_call_options(),
                fetch_subscriptions_request,
            )
            .await
            .map_err(UStatus::from)
    }
    async fn register_for_notifications(
        &self,
        notifications_register_request: NotificationsRequest,
    ) -> Result<(), UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, NotificationsResponse>(
                usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS),
                Self::default_call_options(),
                notifications_register_request,
            )
            .await
            .map(|_response| ())
            .map_err(UStatus::from)
    }
    async fn unregister_for_notifications(
        &self,
        notifications_unregister_request: NotificationsRequest,
    ) -> Result<(), UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, NotificationsResponse>(
                usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS),
                Self::default_call_options(),
                notifications_unregister_request,
            )
            .await
            .map(|_response| ())
            .map_err(UStatus::from)
    }
    async fn fetch_subscribers(
        &self,
        fetch_subscribers_request: FetchSubscribersRequest,
    ) -> Result<FetchSubscribersResponse, UStatus> {
        self.rpc_client
            .invoke_proto_method::<_, FetchSubscribersResponse>(
                usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS),
                Self::default_call_options(),
                fetch_subscribers_request,
            )
            .await
            .map_err(UStatus::from)
    }
}
#[cfg(test)]
mod tests {
    use mockall::Sequence;
    use super::*;
    use crate::{
        communication::{rpc::MockRpcClient, UPayload},
        core::usubscription::{Request, SubscriptionResponse},
        UCode, UUri,
    };
    use std::sync::Arc;
    #[tokio::test]
    async fn test_subscribe_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = SubscriptionRequest {
            topic: Some(topic).into(),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE) && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<SubscriptionRequest>()
                    .unwrap();
                request == expected_request && method == &usubscription_uri(RESOURCE_ID_SUBSCRIBE)
            })
            .returning(move |_method, _options, _payload| {
                let response = SubscriptionResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .subscribe(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client.subscribe(request).await.is_ok());
    }
    #[tokio::test]
    async fn test_unsubscribe_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = UnsubscribeRequest {
            topic: Some(topic).into(),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE) && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<UnsubscribeRequest>()
                    .unwrap();
                request == expected_request && method == &usubscription_uri(RESOURCE_ID_UNSUBSCRIBE)
            })
            .returning(move |_method, _options, _payload| {
                let response = UnsubscribeResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .unsubscribe(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client.unsubscribe(request).await.is_ok());
    }
    #[tokio::test]
    async fn test_fetch_subscriptions_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = FetchSubscriptionsRequest {
            request: Some(Request::Topic(topic)),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS) && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<FetchSubscriptionsRequest>()
                    .unwrap();
                request == expected_request
                    && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIPTIONS)
            })
            .returning(move |_method, _options, _payload| {
                let response = FetchSubscriptionsResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .fetch_subscriptions(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client
            .fetch_subscriptions(request)
            .await
            .is_ok());
    }
    #[tokio::test]
    async fn test_fetch_subscribers_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = FetchSubscribersRequest {
            topic: Some(topic).into(),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS) && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<FetchSubscribersRequest>()
                    .unwrap();
                request == expected_request
                    && method == &usubscription_uri(RESOURCE_ID_FETCH_SUBSCRIBERS)
            })
            .returning(move |_method, _options, _payload| {
                let response = FetchSubscribersResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .fetch_subscribers(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client
            .fetch_subscribers(request)
            .await
            .is_ok());
    }
    #[tokio::test]
    async fn test_register_for_notifications_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = NotificationsRequest {
            topic: Some(topic).into(),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
                    && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<NotificationsRequest>()
                    .unwrap();
                request == expected_request
                    && method == &usubscription_uri(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS)
            })
            .returning(move |_method, _options, _payload| {
                let response = NotificationsResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .register_for_notifications(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client
            .register_for_notifications(request)
            .await
            .is_ok());
    }
    #[tokio::test]
    async fn test_unregister_for_notifications_invokes_rpc_client() {
        let topic = UUri::try_from_parts("other", 0xd5a3, 0x01, 0xd3fe).unwrap();
        let request = NotificationsRequest {
            topic: Some(topic).into(),
            ..Default::default()
        };
        let expected_request = request.clone();
        let mut rpc_client = MockRpcClient::new();
        let mut seq = Sequence::new();
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(|method, _options, payload| {
                method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
                    && payload.is_some()
            })
            .return_const(Err(crate::communication::ServiceInvocationError::Internal(
                "internal error".to_string(),
            )));
        rpc_client
            .expect_invoke_method()
            .once()
            .in_sequence(&mut seq)
            .withf(move |method, _options, payload| {
                let request = payload
                    .to_owned()
                    .unwrap()
                    .extract_protobuf::<NotificationsRequest>()
                    .unwrap();
                request == expected_request
                    && method == &usubscription_uri(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS)
            })
            .returning(move |_method, _options, _payload| {
                let response = NotificationsResponse {
                    ..Default::default()
                };
                Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
            });
        let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));
        assert!(usubscription_client
            .unregister_for_notifications(request.clone())
            .await
            .is_err_and(|e| e.get_code() == UCode::INTERNAL));
        assert!(usubscription_client
            .unregister_for_notifications(request)
            .await
            .is_ok());
    }
}