1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use crate::node::{LocalNotify, Requests};
use crate::{App, Error, LocalServiceId, NodeId, Request, Response, ServiceId};
use anyhow::Result;
use futures::channel::mpsc;
use futures::future::AbortHandle;
use futures::lock::Mutex;
use futures::SinkExt;
use potatonet_common::{bus_message, Context};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

/// 节点上下文
pub struct NodeContext<'a> {
    /// 请求或者通知来源
    pub(crate) from: Option<ServiceId>,

    // 当前服务名
    pub(crate) service_name: &'a str,

    /// 当前节点id
    pub(crate) node_id: NodeId,

    /// 当前服务id
    pub(crate) local_service_id: LocalServiceId,

    /// 本地app
    pub(crate) app: &'a App,

    /// 发送消息
    pub(crate) tx: mpsc::Sender<bus_message::Message>,

    /// 通知本地服务
    pub(crate) tx_local_notify: mpsc::UnboundedSender<LocalNotify>,

    /// 未完成请求表
    pub(crate) requests: Arc<Mutex<Requests>>,

    /// 用于停止node的运行
    pub(crate) abort_handle: AbortHandle,
}

impl<'a> NodeContext<'a> {
    /// 请求或者通知来源服务id
    pub fn from(&self) -> Option<ServiceId> {
        self.from
    }

    /// 当前服务名称
    pub fn service_name(&self) -> &str {
        self.service_name
    }

    /// 当前服务id
    pub fn service_id(&self) -> ServiceId {
        self.local_service_id.to_global(self.node_id)
    }

    /// 停止节点运行
    pub fn shutdown_node(&self) {
        self.abort_handle.abort();
    }
}

#[async_trait::async_trait]
impl<'a> Context for NodeContext<'a> {
    async fn call<T, R>(&self, service_name: &str, request: Request<T>) -> Result<Response<R>>
    where
        T: Serialize + Send + Sync,
        R: DeserializeOwned + Send + Sync,
    {
        trace!("call. service={} method={}", service_name, request.method);

        if self.app.services_map.contains_key(service_name) {
            // 优先调用本地服务
            if let Some(lid) = self.app.services_map.get(service_name) {
                if let Some((_, init, service)) = self.app.services.get(lid.to_u32() as usize) {
                    if init.load(Ordering::Relaxed) {
                        let resp = service.call(self, request.to_bytes()).await?;
                        let new_resp = Response::from_bytes(resp);
                        return Ok(new_resp);
                    }
                }
            }
        }

        let (seq, rx) = self.requests.lock().await.add();
        let request = request.to_bytes();
        self.tx
            .clone()
            .send(bus_message::Message::Req {
                seq,
                from: Some(self.local_service_id),
                to_service: service_name.to_string(),
                method: request.method,
                data: request.data,
            })
            .await
            .ok();
        match async_std::future::timeout(Duration::from_secs(5), rx).await {
            Ok(Ok(Ok(resp))) => Ok(Response::<R>::from_bytes(resp)),
            Ok(Ok(Err(err))) => Err(anyhow!(err)),
            Ok(Err(_)) => {
                let mut requests = self.requests.lock().await;
                requests.remove(seq);
                Err(Error::Internal.into())
            }
            Err(_) => {
                let mut requests = self.requests.lock().await;
                requests.remove(seq);
                Err(Error::Timeout.into())
            }
        }
    }

    async fn notify<T: Serialize + Send + Sync>(&self, service_name: &str, request: Request<T>) {
        trace!("notify. service={} method={}", service_name, request.method);

        let request = request.to_bytes();

        // 通知本地服务
        if let Some(lid) = self.app.services_map.get(service_name) {
            self.tx_local_notify
                .clone()
                .send(LocalNotify {
                    from: self.local_service_id,
                    to: *lid,
                    request: request.clone(),
                })
                .await
                .ok();
        }

        // 通知远程服务
        self.tx
            .clone()
            .send(bus_message::Message::SendNotify {
                from: Some(self.local_service_id),
                to_service: service_name.to_string(),
                method: request.method,
                data: request.data,
            })
            .await
            .ok();
    }
}