apache_dubbo/
framework.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::pin::Pin;
20
21use futures::future;
22use futures::Future;
23use futures::FutureExt;
24
25use crate::common::url::Url;
26use crate::protocol::triple::triple_invoker::TripleInvoker;
27use crate::protocol::triple::triple_protocol::TripleProtocol;
28use crate::protocol::{Exporter, Protocol};
29use dubbo_config::{get_global_config, RootConfig};
30
31pub type BoxExporter = Box<dyn Exporter<InvokerType = TripleInvoker>>;
32// Invoker是否可以基于hyper写一个通用的
33
34#[derive(Default)]
35pub struct Dubbo {
36    protocols: HashMap<String, Vec<Url>>,
37    config: Option<RootConfig>,
38}
39
40impl Dubbo {
41    pub fn new() -> Dubbo {
42        Self {
43            protocols: HashMap::new(),
44            config: None,
45        }
46    }
47
48    pub fn with_config(mut self, c: RootConfig) -> Self {
49        self.config = Some(c);
50        self
51    }
52
53    pub fn init(&mut self) {
54        tracing_subscriber::fmt::init();
55
56        if self.config.is_none() {
57            self.config = Some(get_global_config())
58        }
59
60        let conf = self.config.as_ref().unwrap();
61        tracing::debug!("global conf: {:?}", conf);
62        for (_, c) in conf.service.iter() {
63            let u = if c.protocol_configs.is_empty() {
64                let protocol = match conf.protocols.get(&c.protocol) {
65                    Some(v) => v.to_owned(),
66                    None => {
67                        tracing::warn!("protocol {:?} not exists", c.protocol);
68                        continue;
69                    }
70                };
71                let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
72                Url::from_url(&protocol_url)
73            } else {
74                let protocol = match c.protocol_configs.get(&c.protocol) {
75                    Some(v) => v.to_owned(),
76                    None => {
77                        tracing::warn!("protocol {:?} not exists", c.protocol);
78                        continue;
79                    }
80                };
81                let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
82                Url::from_url(&protocol_url)
83            };
84            tracing::info!("url: {:?}", u);
85            if u.is_none() {
86                continue;
87            }
88
89            let u = u.unwrap();
90            if self.protocols.get(&c.protocol).is_some() {
91                self.protocols.get_mut(&c.protocol).unwrap().push(u);
92            } else {
93                self.protocols.insert(c.protocol.clone(), vec![u]);
94            }
95        }
96    }
97
98    pub async fn start(&mut self) {
99        self.init();
100
101        // TODO: server registry
102
103        let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
104        for (key, c) in self.protocols.iter() {
105            match key.as_str() {
106                "triple" => {
107                    let pro = Box::new(TripleProtocol::new());
108                    for u in c.iter() {
109                        let tri_fut = pro
110                            .clone()
111                            .export(u.clone())
112                            .map(|res| Box::new(res) as BoxExporter)
113                            .boxed();
114                        async_vec.push(tri_fut);
115                    }
116                }
117                _ => {
118                    tracing::error!("protocol {:?} not implemented", key);
119                }
120            }
121        }
122
123        let _res = future::join_all(async_vec).await;
124    }
125}