hyper_function_core/
lib.rs

1use dashmap::DashMap;
2use gateway::gateway::Gateway;
3use rusty_ulid::generate_ulid_string;
4use server::{
5    server::Server,
6    socket::{Action, ActionSendMessage},
7};
8use std::{env, fs::read_to_string, path::Path};
9
10use once_cell::sync::OnceCell;
11use tokio::{
12    runtime::{Builder, Runtime},
13    sync::mpsc::{self, error::TryRecvError},
14};
15
16mod codec;
17mod gateway;
18mod server;
19
20pub static mut APP_ID: String = String::new();
21pub static mut UPSTREAM_ID: String = String::new();
22
23pub static RUNTIME: OnceCell<Runtime> = OnceCell::new();
24
25pub static SOCKET_CHANS: OnceCell<DashMap<String, mpsc::UnboundedSender<Action>>> = OnceCell::new();
26
27pub static mut READ_CHAN_RX: OnceCell<mpsc::UnboundedReceiver<Vec<u8>>> = OnceCell::new();
28pub static READ_CHAN_TX: OnceCell<mpsc::UnboundedSender<Vec<u8>>> = OnceCell::new();
29
30pub static mut GATEWAY_WRITE_CHAN_RX: OnceCell<mpsc::UnboundedReceiver<(String, Vec<u8>)>> =
31    OnceCell::new();
32pub static GATEWAY_WRITE_CHAN_TX: OnceCell<mpsc::UnboundedSender<(String, Vec<u8>)>> =
33    OnceCell::new();
34
35pub static INIT_ARGS: OnceCell<codec::InitArgs> = OnceCell::new();
36pub static JSON_CONFIG: OnceCell<codec::JsonConfig> = OnceCell::new();
37
38pub fn init(args: Vec<u8>) -> Vec<u8> {
39    if RUNTIME.get().is_some() {
40        panic!("Instance already initialized");
41    }
42
43    let args = codec::InitArgs::from_buf(args);
44
45    let mut config_path;
46    if env::var("HFN_CONFIG_PATH").is_ok() {
47        let path = env::var("HFN_CONFIG_PATH").unwrap();
48        config_path = Path::new(&path).to_owned();
49    } else if let Some(hfn_config_path) = &args.hfn_config_path {
50        config_path = Path::new(hfn_config_path).to_owned();
51    } else {
52        config_path = env::current_dir().unwrap();
53        config_path.push("hfn.json");
54    }
55
56    if !config_path.exists() {
57        panic!("hfn.json file not found: {}", config_path.display());
58    }
59
60    let json_config =
61        codec::JsonConfig::from_str(read_to_string(config_path).expect("failed to read hfn.json"));
62
63    let mut runtime_builder = Builder::new_multi_thread();
64
65    if let Some(tokio_work_threads) = &args.tokio_work_threads {
66        runtime_builder.worker_threads(*tokio_work_threads);
67    }
68
69    runtime_builder.thread_name("hfn-core-runtime-worker");
70    runtime_builder.enable_all();
71    let runtime = runtime_builder.build().expect("unable build tokio runtime");
72    RUNTIME.set(runtime).unwrap();
73
74    let (hfn_packages, hfn_modules, hfn_models, hfn_hfns, hfn_rpcs, hfn_schemas, hfn_fields) =
75        json_config.to_hfn_struct();
76
77    let upstream_id;
78    if let Some(id) = &args.upstream_id {
79        upstream_id = id.to_owned();
80    } else {
81        upstream_id = generate_ulid_string();
82    }
83
84    let (read_tx, read_rx) = mpsc::unbounded_channel::<Vec<u8>>();
85
86    unsafe {
87        APP_ID = json_config.appid.clone();
88        UPSTREAM_ID = upstream_id.clone();
89
90        READ_CHAN_RX.set(read_rx).unwrap();
91    }
92
93    READ_CHAN_TX.set(read_tx).unwrap();
94
95    SOCKET_CHANS.set(DashMap::new()).unwrap();
96    INIT_ARGS.set(args).unwrap();
97    JSON_CONFIG.set(json_config).unwrap();
98
99    let result = codec::InitResult {
100        upstream_id,
101        packages: hfn_packages,
102        modules: hfn_modules,
103        models: hfn_models,
104        hfns: hfn_hfns,
105        rpcs: hfn_rpcs,
106        schemas: hfn_schemas,
107        fields: hfn_fields,
108    };
109    result.to_buf()
110}
111
112pub fn run() {
113    let init_args = INIT_ARGS.get().unwrap();
114    let json_config = JSON_CONFIG.get().unwrap();
115    let upstream_id = unsafe { UPSTREAM_ID.clone() };
116
117    let runtime = RUNTIME.get().unwrap();
118
119    if !init_args.dev {
120        let addr = init_args.addr.as_ref().unwrap().clone();
121        runtime.spawn(async move {
122            let server = Server { addr };
123            server.listen().await
124        });
125    } else {
126        let mut url = url::Url::parse(&json_config.dev.devtools).unwrap();
127        url.set_path("/us");
128
129        url.query_pairs_mut().append_pair("usid", &upstream_id);
130
131        url.query_pairs_mut()
132            .append_pair("appid", &json_config.appid);
133
134        url.query_pairs_mut()
135            .append_pair("ver", env!("CARGO_PKG_VERSION"));
136
137        url.query_pairs_mut().append_pair("sdk", &init_args.sdk);
138
139        let (write_tx, write_rx) = mpsc::unbounded_channel::<(String, Vec<u8>)>();
140
141        GATEWAY_WRITE_CHAN_TX.set(write_tx).unwrap();
142
143        let read_tx = READ_CHAN_TX.get().unwrap().clone();
144        let gateway = Gateway {
145            dev: true,
146            runway: url,
147            read_tx,
148        };
149
150        runtime.spawn(async move {
151            gateway.connect(write_rx).await;
152        });
153
154        // todo add package signature for querystring
155    }
156}
157
158pub fn read() -> Option<Vec<u8>> {
159    let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
160    let data = read_rx.blocking_recv();
161    data
162}
163
164pub enum TryReadRes {
165    DATA(Vec<u8>),
166    EMPTY,
167    CLOSED,
168}
169
170pub fn try_read() -> TryReadRes {
171    let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
172    match read_rx.try_recv() {
173        Ok(data) => TryReadRes::DATA(data),
174        Err(e) => match e {
175            TryRecvError::Empty => TryReadRes::EMPTY,
176            TryRecvError::Disconnected => TryReadRes::CLOSED,
177        },
178    }
179}
180
181pub async fn read_async() -> Option<Vec<u8>> {
182    let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
183    let data = read_rx.recv().await;
184    data
185}
186
187pub fn send_message(socket_id: String, payload: Vec<u8>) {
188    if let Some(gateway_write_tx) = GATEWAY_WRITE_CHAN_TX.get() {
189        gateway_write_tx.send((socket_id, payload)).unwrap();
190        return;
191    }
192
193    let socket_chans = SOCKET_CHANS.get().unwrap();
194    if let Some(socket_chan) = socket_chans.get(&socket_id) {
195        socket_chan.send(Action::SendMessage(ActionSendMessage { payload }));
196
197        return;
198    }
199}