1use anyhow::{bail, Result};
2use futures::channel::mpsc::{self, UnboundedReceiver};
3use invoker::invoker::{invoke, invoke_with_pool};
4use loader::reflection_checker::{ReflectionVersion, TryGetReflectionVersionResponse};
5use prost_reflect::DescriptorPool;
6use stream::fluid_stream_event::FluidStreamEvent;
7use tokio_util::sync::CancellationToken;
8
9pub(crate) mod invoker;
10pub mod loader;
11pub mod misc;
12pub mod stream;
13
14pub async fn list_from_server_reflection(
15 server_url: String,
16 reflection_version: Option<ReflectionVersion>,
17) -> Result<DescriptorPool> {
18 let pool = match reflection_version {
19 Some(version) => load_reflection_data(server_url, version),
20 None => {
21 let auto_detect_verison =
22 loader::reflection_checker::try_get_reflection_version(server_url.clone()).await?;
23
24 let version = match auto_detect_verison {
25 TryGetReflectionVersionResponse::DetectedVersion(version) => version,
26 TryGetReflectionVersionResponse::ConnectionError { error } => bail!(error),
27 TryGetReflectionVersionResponse::VersionUndetectable => {
28 bail!("Could not determine reflection version.")
29 }
30 };
31
32 load_reflection_data(server_url, version)
33 }
34 }
35 .await?;
36
37 Ok(pool)
38}
39
40async fn load_reflection_data(
41 server_url: String,
42 reflection_version: ReflectionVersion,
43) -> Result<DescriptorPool> {
44 Ok(match reflection_version {
45 ReflectionVersion::V1 => {
46 loader::reflection_loader_v1::load_from_server_reflection(server_url).await?
47 }
48 ReflectionVersion::V1Alpha => {
49 loader::reflection_loader_v1alpha::load_from_server_reflection(server_url).await?
50 }
51 })
52}
53
54pub async fn list_from_files(
55 file_paths: Vec<String>,
56 include_paths: Vec<String>,
57) -> Result<DescriptorPool> {
58 let pool = loader::file_loader::load_from_files(file_paths, include_paths)?;
59
60 Ok(pool)
61}
62
63pub async fn invoke_method(
64 server_url: String,
65 target_method: String,
66 data: Option<String>,
67 file_paths: Option<Vec<String>>,
68 include_paths: Option<Vec<String>>,
69 cancellation_token: CancellationToken,
70) -> Result<UnboundedReceiver<FluidStreamEvent>> {
71 let (tx, rx) = mpsc::unbounded::<FluidStreamEvent>();
72
73 tokio::spawn(invoke(
74 tx,
75 server_url,
76 target_method,
77 data,
78 file_paths,
79 include_paths,
80 cancellation_token,
81 ));
82
83 Ok(rx)
84}
85
86pub async fn invoke_method_raw(
87 pool_bytes: Vec<u8>,
88 server_url: String,
89 target_method: String,
90 data: Option<String>,
91 cancellation_token: CancellationToken,
92) -> Result<UnboundedReceiver<FluidStreamEvent>> {
93 let pool = DescriptorPool::decode(pool_bytes.as_slice())?;
94
95 let (tx, rx) = mpsc::unbounded::<FluidStreamEvent>();
96
97 tokio::spawn(invoke_with_pool(
98 tx,
99 pool,
100 server_url,
101 target_method,
102 data,
103 cancellation_token,
104 ));
105
106 Ok(rx)
107}