architect_sdk/
client.rs

1//! General purpose client for Architect.
2//!
3//! Provides a convenience interface for the underlying gRPC calls, handles
4//! service discovery and authentication in the background, and also implements
5//! some useful utilities and patterns for marketdata and orderflow.
6
7use anyhow::{anyhow, bail, Result};
8use arc_swap::ArcSwapOption;
9use architect_api::{
10    accounts::*,
11    auth::*,
12    core::*,
13    folio::*,
14    grpc::service::{
15        accounts_client::*, auth_client::*, core_client::*, folio_client::*,
16        marketdata_client::*, oms_client::*, orderflow_client::*, symbology_client::*,
17    },
18    marketdata::{CandleWidth, *},
19    oms::*,
20    orderflow::*,
21    symbology::{protocol::*, *},
22    utils::pagination::OffsetAndLimit,
23    *,
24};
25use arcstr::ArcStr;
26use chrono::{DateTime, NaiveTime, Utc};
27use hickory_resolver::TokioResolver;
28use log::{debug, error, info};
29use parking_lot::RwLock;
30use std::{collections::HashMap, str::FromStr, sync::Arc};
31use tonic::{
32    codec::Streaming,
33    metadata::MetadataValue,
34    transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri},
35    IntoRequest, Request,
36};
37
38const ARCHITECT_CA: &[u8] = include_bytes!("ca.crt");
39const PAPER_GRPC_PORT: u16 = 10081;
40
41// convenience re-exports
42pub use architect_api::{
43    folio::{HistoricalFillsRequest, HistoricalOrdersRequest},
44    oms::{CancelOrderRequest, PlaceOrderRequest},
45};
46
47#[derive(Debug, Clone)]
48#[allow(dead_code)]
49pub struct Architect {
50    core: Channel,
51    symbology: Arc<RwLock<Option<Channel>>>,
52    marketdata: Arc<RwLock<HashMap<MarketdataVenue, Channel>>>,
53    hmart: Channel,
54    ca: Arc<Certificate>,
55    api_key: ArcStr,
56    api_secret: ArcStr,
57    paper_trading: bool,
58    jwt: Arc<ArcSwapOption<(ArcStr, DateTime<Utc>)>>,
59}
60
61impl Architect {
62    // ------------------------------------------------------------
63    // Initialization and configuration
64    // ------------------------------------------------------------
65
66    pub async fn connect(
67        api_key: impl AsRef<str>,
68        api_secret: impl AsRef<str>,
69        paper_trading: bool,
70    ) -> Result<Self> {
71        Self::connect_to("app.architect.co", api_key, api_secret, paper_trading).await
72    }
73
74    pub async fn connect_to(
75        endpoint: impl AsRef<str>,
76        api_key: impl AsRef<str>,
77        api_secret: impl AsRef<str>,
78        paper_trading: bool,
79    ) -> Result<Self> {
80        if paper_trading {
81            println!("🧻 \x1b[30;43m YOU ARE IN PAPER TRADING MODE \x1b[0m");
82        }
83
84        let endpoint = Self::resolve_endpoint(endpoint, paper_trading).await?;
85        debug!("connecting to endpoint: {}", endpoint.uri());
86        let hmart_endpoint =
87            Self::resolve_endpoint("https://historical.marketdata.architect.co", false)
88                .await?;
89        let core = endpoint.connect().await?;
90        let marketdata = Arc::new(RwLock::new(HashMap::new()));
91        let hmart = hmart_endpoint.connect_lazy();
92        let t = Self {
93            core,
94            symbology: Arc::new(RwLock::new(None)),
95            marketdata,
96            hmart,
97            ca: Arc::new(Certificate::from_pem(ARCHITECT_CA)),
98            api_key: ArcStr::from(api_key.as_ref()),
99            api_secret: ArcStr::from(api_secret.as_ref()),
100            paper_trading,
101            jwt: Arc::new(ArcSwapOption::empty()),
102        };
103        t.refresh_jwt(false).await?;
104        t.discover_services().await?;
105        Ok(t)
106    }
107
108    /// Resolve a service gRPC endpoint given its URL.
109    ///
110    /// If localhost or an IP address is given, it will be returned as is.
111    ///
112    /// If a domain name is given, it will be resolved to an IP address and
113    /// port using SRV records.  If a port is specified in `url`, it always
114    /// takes precedence over the port found in SRV records.
115    ///
116    /// If paper_trading is true and the host is app.architect.co or staging.architect.co,
117    /// the port will be overridden to PAPER_GRPC_PORT.
118    pub async fn resolve_endpoint(
119        endpoint: impl AsRef<str>,
120        paper_trading: bool,
121    ) -> Result<Endpoint> {
122        let uri: Uri = endpoint.as_ref().parse()?;
123        let host = uri
124            .host()
125            .ok_or_else(|| anyhow!("no host name or ip address in endpoint"))?;
126        let use_ssl = uri.scheme_str() == Some("https")
127            || (uri.scheme_str() != Some("http") && host.ends_with(".architect.co"));
128        let scheme = if use_ssl { "https" } else { "http" };
129        let resolved = match uri.port() {
130            Some(port) => {
131                format!("{scheme}://{host}:{port}")
132            }
133            None => {
134                // no port provided, lookup SRV records
135                let resolver = TokioResolver::builder_tokio()?.build();
136                let records = resolver.srv_lookup(host).await?;
137                let rec = records
138                    .iter()
139                    .next()
140                    .ok_or_else(|| anyhow!("no SRV records found for host: {host}"))?;
141                let target = rec.target();
142                let mut port = rec.port();
143                if paper_trading {
144                    let target_str = target.to_string();
145                    if target_str.contains("app.architect.co")
146                        || target_str.contains("staging.architect.co")
147                    {
148                        port = PAPER_GRPC_PORT;
149                    }
150                }
151                format!("{scheme}://{target}:{port}")
152            }
153        };
154        debug!("resolved endpoint: {} -> {}", endpoint.as_ref(), resolved);
155        let mut endpoint = Endpoint::try_from(resolved)?
156            .connect_timeout(std::time::Duration::from_secs(3));
157        if use_ssl {
158            endpoint =
159                endpoint.tls_config(ClientTlsConfig::new().with_enabled_roots())?;
160        }
161        Ok(endpoint)
162    }
163
164    /// Refresh the JWT if it's nearing expiration (within 1 minute) or if force is true
165    pub async fn refresh_jwt(&self, force: bool) -> Result<()> {
166        if !force {
167            if let Some(jwt_and_expiration) = self.jwt.load_full() {
168                let (_jwt, expiration) = &*jwt_and_expiration;
169                let now = Utc::now();
170                if (*expiration - now).num_seconds() > 60 {
171                    return Ok(());
172                }
173            }
174        }
175        info!("refreshing JWT...");
176        let mut client = AuthClient::new(self.core.clone());
177        let req = CreateJwtRequest {
178            api_key: self.api_key.to_string(),
179            api_secret: self.api_secret.to_string(),
180            grants: None,
181        };
182        let res = client.create_jwt(req).await?;
183        let jwt: ArcStr = format!("Bearer {}", res.into_inner().jwt).into();
184        let expiration = Utc::now() + chrono::Duration::seconds(3600);
185        self.jwt.store(Some(Arc::new((jwt, expiration))));
186        Ok(())
187    }
188
189    async fn with_jwt<R, T>(&self, request: R) -> Result<Request<T>>
190    where
191        R: IntoRequest<T>,
192    {
193        if let Err(e) = self.refresh_jwt(false).await {
194            error!("failed to refresh JWT: {e:?}");
195        }
196        match self.jwt.load_full() {
197            Some(jwt_and_expiration) => {
198                let (jwt, _expiration) = &*jwt_and_expiration;
199                let mut req = request.into_request();
200                req.metadata_mut()
201                    .insert("authorization", MetadataValue::from_str(jwt.as_str())?);
202                Ok(req)
203            }
204            _ => Ok(request.into_request()),
205        }
206    }
207
208    /// Discover service endpoints from Architect.
209    ///
210    /// The Architect core is responsible for telling you where to find services
211    /// like symbology and marketdata as per its configuration.  You can also
212    /// manually set endpoints by calling set_symbology and set_marketdata
213    /// directly.
214    pub async fn discover_services(&self) -> Result<()> {
215        info!("discovering service endpoints...");
216        let mut client = CoreClient::new(self.core.clone());
217        let req = ConfigRequest {};
218        let res = client.config(req).await?;
219        let config = res.into_inner();
220        if let Some(symbology) = config.symbology {
221            info!("setting symbology endpoint: {}", symbology);
222            let endpoint = Self::resolve_endpoint(symbology, false).await?;
223            let channel = endpoint.connect_lazy();
224            let mut symbology = self.symbology.write();
225            *symbology = Some(channel);
226        }
227        for (venue, endpoint) in config.marketdata {
228            info!("setting marketdata endpoint for {venue}: {endpoint}");
229            let endpoint = Self::resolve_endpoint(endpoint, false).await?;
230            let channel = endpoint.connect_lazy();
231            let mut marketdata = self.marketdata.write();
232            marketdata.insert(venue, channel);
233        }
234        Ok(())
235    }
236
237    fn _symbology(&self) -> Result<Channel> {
238        let symbology = self.symbology.read();
239        if let Some(channel) = &*symbology {
240            Ok(channel.clone())
241        } else {
242            bail!("no symbology endpoint");
243        }
244    }
245
246    /// Manually set the symbology endpoint.
247    pub async fn set_symbology(&self, endpoint: impl AsRef<str>) -> Result<()> {
248        let endpoint = Self::resolve_endpoint(endpoint, false).await?;
249        info!("setting symbology endpoint: {}", endpoint.uri());
250        let channel = endpoint.connect_lazy();
251        let mut symbology = self.symbology.write();
252        *symbology = Some(channel);
253        Ok(())
254    }
255
256    /// Manually set the marketdata endpoint for a venue.
257    pub async fn set_marketdata(
258        &self,
259        venue: MarketdataVenue,
260        endpoint: impl AsRef<str>,
261    ) -> Result<()> {
262        let endpoint = Self::resolve_endpoint(endpoint, false).await?;
263        info!("setting marketdata endpoint for {venue}: {}", endpoint.uri());
264        let channel = endpoint.connect_lazy();
265        let mut marketdata = self.marketdata.write();
266        marketdata.insert(venue, channel);
267        Ok(())
268    }
269
270    fn marketdata(&self, venue: impl AsRef<str>) -> Result<Channel> {
271        let venue = venue.as_ref();
272        let channel = self
273            .marketdata
274            .read()
275            .get(venue)
276            .ok_or_else(|| anyhow!("no marketdata endpoint set for {venue}"))?
277            .clone();
278        Ok(channel)
279    }
280
281    /// Manually set the hmart (historical marketdata service) endpoint.
282    pub async fn set_hmart(&mut self, endpoint: impl AsRef<str>) -> Result<()> {
283        let endpoint = Self::resolve_endpoint(endpoint, false).await?;
284        info!("setting hmart endpoint: {}", endpoint.uri());
285        self.hmart = endpoint.connect_lazy();
286        Ok(())
287    }
288
289    // ------------------------------------------------------------
290    // Symbology
291    // ------------------------------------------------------------
292
293    /// List all symbols.
294    ///
295    /// If marketdata is specified, query the marketdata endpoint directly;
296    /// this may give different answers than the OMS.
297    pub async fn list_symbols(&self, marketdata: Option<&str>) -> Result<Vec<String>> {
298        let channel = match marketdata {
299            Some(venue) => self.marketdata(venue)?,
300            None => self.core.clone(),
301        };
302        let mut client = SymbologyClient::new(channel);
303        let req = SymbolsRequest {};
304        let req = self.with_jwt(req).await?;
305        let res = client.symbols(req).await?;
306        let symbols = res.into_inner().symbols;
307        Ok(symbols)
308    }
309
310    pub async fn get_futures_series(
311        &self,
312        series_symbol: impl AsRef<str>,
313        include_expired: bool,
314    ) -> Result<Vec<Product>> {
315        let mut client = SymbologyClient::new(self.core.clone());
316        let req = FuturesSeriesRequest {
317            series_symbol: series_symbol.as_ref().to_string(),
318            include_expired,
319        };
320        let req = self.with_jwt(req).await?;
321        let res = client.futures_series(req).await?;
322        Ok(res.into_inner().futures)
323    }
324
325    /// Get execution information for a tradable product at a specific venue.
326    ///
327    /// Returns execution details like tick size, step size, minimum order quantity,
328    /// margin requirements, and other venue-specific trading parameters.
329    ///
330    /// The symbol must be a TradableProduct (e.g., "ES 20250620 CME Future/USD").
331    /// Note that this symbol has the format {base}/{quote}, where the quote will generally be USD.
332    pub async fn get_execution_info(
333        &self,
334        symbol: impl AsRef<str>,
335        execution_venue: Option<ExecutionVenue>,
336    ) -> Result<ExecutionInfoResponse> {
337        let symbol = symbol.as_ref();
338
339        let channel = self.core.clone();
340        let mut client = SymbologyClient::new(channel);
341
342        let req = ExecutionInfoRequest { symbol: symbol.to_string(), execution_venue };
343        let req = self.with_jwt(req).await?;
344        let res = client.execution_info(req).await?;
345        Ok(res.into_inner())
346    }
347
348    // ------------------------------------------------------------
349    // Marketdata
350    // ------------------------------------------------------------
351
352    pub async fn get_market_status(
353        &self,
354        symbol: impl AsRef<str>,
355        venue: impl AsRef<str>,
356    ) -> Result<MarketStatus> {
357        let symbol = symbol.as_ref();
358        let venue = venue.as_ref();
359        let channel = self.marketdata(venue)?;
360        let mut client = MarketdataClient::new(channel);
361        let req =
362            MarketStatusRequest { symbol: symbol.to_string(), venue: Some(venue.into()) };
363        let req = self.with_jwt(req).await?;
364        let res = client.market_status(req).await?;
365        Ok(res.into_inner())
366    }
367
368    pub async fn get_historical_candles(
369        &self,
370        symbol: impl AsRef<str>,
371        venue: impl AsRef<str>,
372        candle_width: CandleWidth,
373        start_date: DateTime<Utc>,
374        end_date: DateTime<Utc>,
375    ) -> Result<Vec<Candle>> {
376        use crate::marketdata::synthetic_candles::{
377            synthesize_2m_candles, synthesize_3m_candles,
378        };
379
380        let symbol = symbol.as_ref();
381        let venue = venue.as_ref();
382
383        // For 2m/3m candles, fetch 1m candles and compose them client-side
384        let (actual_width, needs_composition) = match candle_width {
385            CandleWidth::TwoMinute | CandleWidth::ThreeMinute => {
386                (CandleWidth::OneMinute, true)
387            }
388            _ => (candle_width, false),
389        };
390
391        // Special handling for US-EQUITIES: route to marketdata channel instead of hmart
392        // if this branch is changed, fix the graphql query and python sdk function as well
393        let (channel, venue_param) = if venue == "US-EQUITIES" {
394            (self.marketdata(venue)?, None)
395        } else {
396            (self.hmart.clone(), Some(venue.into()))
397        };
398
399        let mut client = MarketdataClient::new(channel);
400        let req = HistoricalCandlesRequest {
401            symbol: symbol.to_string(),
402            venue: venue_param,
403            candle_width: actual_width,
404            start_date,
405            end_date,
406        };
407        let req = self.with_jwt(req).await?;
408        let res = client.historical_candles(req).await?;
409        let candles = res.into_inner().candles;
410
411        // Compose if needed
412        if needs_composition {
413            match candle_width {
414                CandleWidth::TwoMinute => Ok(synthesize_2m_candles(candles)),
415                CandleWidth::ThreeMinute => Ok(synthesize_3m_candles(candles)),
416                _ => unreachable!(),
417            }
418        } else {
419            Ok(candles)
420        }
421    }
422
423    pub async fn get_l1_book_snapshot(
424        &self,
425        symbol: impl AsRef<str>,
426        venue: impl AsRef<str>,
427    ) -> Result<L1BookSnapshot> {
428        let symbol = symbol.as_ref();
429        let venue = venue.as_ref();
430        let channel = self.marketdata(venue)?;
431        let mut client = MarketdataClient::new(channel);
432        let req = L1BookSnapshotRequest {
433            symbol: symbol.to_string(),
434            venue: Some(venue.into()),
435        };
436        let req = self.with_jwt(req).await?;
437        let res = client.l1_book_snapshot(req).await?;
438        Ok(res.into_inner())
439    }
440
441    pub async fn get_l1_book_snapshots(
442        &self,
443        symbols: impl IntoIterator<Item = impl AsRef<str>>,
444        venue: impl AsRef<str>,
445    ) -> Result<Vec<L1BookSnapshot>> {
446        let symbols =
447            symbols.into_iter().map(|s| s.as_ref().to_string()).collect::<Vec<_>>();
448        let venue = venue.as_ref();
449        let channel = self.marketdata(venue)?;
450        let mut client = MarketdataClient::new(channel);
451        let req =
452            L1BookSnapshotsRequest { symbols: Some(symbols), venue: Some(venue.into()) };
453        let req = self.with_jwt(req).await?;
454        let res = client.l1_book_snapshots(req).await?;
455        Ok(res.into_inner())
456    }
457
458    pub async fn get_l2_book_snapshot(
459        &self,
460        symbol: impl AsRef<str>,
461        venue: impl AsRef<str>,
462    ) -> Result<L2BookSnapshot> {
463        let symbol = symbol.as_ref();
464        let venue = venue.as_ref();
465        let channel = self.marketdata(venue)?;
466        let mut client = MarketdataClient::new(channel);
467        let req = L2BookSnapshotRequest {
468            symbol: symbol.to_string(),
469            venue: Some(venue.into()),
470        };
471        let req = self.with_jwt(req).await?;
472        let res = client.l2_book_snapshot(req).await?;
473        Ok(res.into_inner())
474    }
475
476    pub async fn get_ticker(
477        &self,
478        symbol: impl AsRef<str>,
479        venue: impl AsRef<str>,
480    ) -> Result<Ticker> {
481        let symbol = symbol.as_ref();
482        let venue = venue.as_ref();
483        let channel = self.marketdata(venue)?;
484        let mut client = MarketdataClient::new(channel);
485        let req = TickerRequest { symbol: symbol.to_string(), venue: Some(venue.into()) };
486        let req = self.with_jwt(req).await?;
487        let res = client.ticker(req).await?;
488        Ok(res.into_inner())
489    }
490
491    pub async fn get_tickers(
492        &self,
493        venue: impl AsRef<str>,
494        options: GetTickersOptions,
495        sort_tickers_by: Option<SortTickersBy>,
496        offset: Option<i32>,
497        limit: Option<i32>,
498    ) -> Result<Vec<Ticker>> {
499        let venue = venue.as_ref();
500        let channel = self.marketdata(venue)?;
501        let mut client = MarketdataClient::new(channel);
502        let req = TickersRequest {
503            symbols: options.symbols,
504            venue: Some(venue.into()),
505            pagination: OffsetAndLimit { offset, limit, sort_by: sort_tickers_by },
506            include_options: Some(options.include_options),
507        };
508        let req = self.with_jwt(req).await?;
509        let res = client.tickers(req).await?;
510        Ok(res.into_inner().tickers)
511    }
512
513    pub async fn stream_l1_book_snapshots(
514        &self,
515        symbols: impl IntoIterator<Item = impl AsRef<str>>,
516        venue: impl AsRef<str>,
517        send_initial_snapshots: bool,
518    ) -> Result<Streaming<L1BookSnapshot>> {
519        let symbols =
520            symbols.into_iter().map(|s| s.as_ref().to_string()).collect::<Vec<_>>();
521        let venue = venue.as_ref();
522        let channel = self.marketdata(venue)?;
523        let mut client = MarketdataClient::new(channel);
524        let req = SubscribeL1BookSnapshotsRequest {
525            symbols: Some(symbols),
526            venue: Some(venue.into()),
527            send_initial_snapshots,
528        };
529        let req = self.with_jwt(req).await?;
530        let res = client.subscribe_l1_book_snapshots(req).await?;
531        Ok(res.into_inner())
532    }
533
534    pub async fn stream_l2_book_updates(
535        &self,
536        symbol: impl AsRef<str>,
537        venue: impl AsRef<str>,
538    ) -> Result<Streaming<L2BookUpdate>> {
539        let symbol = symbol.as_ref();
540        let venue = venue.as_ref();
541        let channel = self.marketdata(venue)?;
542        let mut client = MarketdataClient::new(channel);
543        let req = SubscribeL2BookUpdatesRequest {
544            symbol: symbol.to_string(),
545            venue: Some(venue.into()),
546        };
547        let req = self.with_jwt(req).await?;
548        let res = client.subscribe_l2_book_updates(req).await?;
549        Ok(res.into_inner())
550    }
551
552    pub async fn stream_trades(
553        &self,
554        symbol: Option<impl AsRef<str>>,
555        venue: impl AsRef<str>,
556    ) -> Result<Streaming<Trade>> {
557        let symbol = symbol.as_ref();
558        let venue = venue.as_ref();
559        let channel = self.marketdata(venue)?;
560        let mut client = MarketdataClient::new(channel);
561        let req = SubscribeTradesRequest {
562            symbol: symbol.map(|s| s.as_ref().to_string()),
563            venue: Some(venue.into()),
564        };
565        let req = self.with_jwt(req).await?;
566        let res = client.subscribe_trades(req).await?;
567        Ok(res.into_inner())
568    }
569
570    pub async fn stream_candles(
571        &self,
572        symbol: impl AsRef<str>,
573        venue: impl AsRef<str>,
574        candle_widths: Option<impl IntoIterator<Item = &CandleWidth>>,
575    ) -> Result<Streaming<Candle>> {
576        let symbol = symbol.as_ref();
577        let venue = venue.as_ref();
578        let channel = self.marketdata(venue)?;
579        let mut client = MarketdataClient::new(channel);
580        let req = SubscribeCandlesRequest {
581            symbol: symbol.to_string(),
582            venue: Some(venue.into()),
583            candle_widths: candle_widths.map(|c| c.into_iter().copied().collect()),
584        };
585        let req = self.with_jwt(req).await?;
586        let res = client.subscribe_candles(req).await?;
587        Ok(res.into_inner())
588    }
589
590    // ------------------------------------------------------------
591    // Portfolio management
592    // ------------------------------------------------------------
593
594    pub async fn list_accounts(
595        &self,
596        trader: Option<TraderIdOrEmail>,
597    ) -> Result<Vec<AccountWithPermissions>> {
598        let mut client = AccountsClient::new(self.core.clone());
599        let req = AccountsRequest { paper: self.paper_trading, trader };
600        let req = self.with_jwt(req).await?;
601        let res = client.accounts(req).await?;
602        Ok(res.into_inner().accounts)
603    }
604
605    pub async fn get_account_summary(
606        &self,
607        account: AccountIdOrName,
608    ) -> Result<AccountSummary> {
609        let mut client = FolioClient::new(self.core.clone());
610        let req = AccountSummaryRequest { account };
611        let req = self.with_jwt(req).await?;
612        let res = client.account_summary(req).await?;
613        Ok(res.into_inner())
614    }
615
616    pub async fn get_account_summaries(
617        &self,
618        account: Option<impl IntoIterator<Item = AccountIdOrName>>,
619        trader: Option<TraderIdOrEmail>,
620    ) -> Result<Vec<AccountSummary>> {
621        let mut client = FolioClient::new(self.core.clone());
622        let req = AccountSummariesRequest {
623            accounts: account.map(|a| a.into_iter().collect()),
624            trader,
625        };
626        let req = self.with_jwt(req).await?;
627        let res = client.account_summaries(req).await?;
628        Ok(res.into_inner().account_summaries)
629    }
630
631    pub async fn get_account_history(
632        &self,
633        account: AccountIdOrName,
634        from_inclusive: Option<DateTime<Utc>>,
635        to_exclusive: Option<DateTime<Utc>>,
636        granularity: Option<AccountHistoryGranularity>,
637        limit: Option<i32>,
638        time_of_day: Option<NaiveTime>,
639    ) -> Result<Vec<AccountSummary>> {
640        let mut client = FolioClient::new(self.core.clone());
641        let req = AccountHistoryRequest {
642            account,
643            from_inclusive,
644            to_exclusive,
645            granularity,
646            limit,
647            time_of_day,
648        };
649        let req = self.with_jwt(req).await?;
650        let res = client.account_history(req).await?;
651        Ok(res.into_inner().history)
652    }
653
654    // ------------------------------------------------------------
655    // Order management
656    // ------------------------------------------------------------
657
658    pub async fn get_open_orders(
659        &self,
660        order_ids: Option<impl IntoIterator<Item = &OrderId>>,
661        venue: Option<impl AsRef<str>>,
662        account: Option<AccountIdOrName>,
663        trader: Option<TraderIdOrEmail>,
664        symbol: Option<impl AsRef<str>>,
665        parent_order_id: Option<OrderId>,
666    ) -> Result<Vec<Order>> {
667        let mut client = OmsClient::new(self.core.clone());
668        let req = OpenOrdersRequest {
669            order_ids: order_ids.map(|o| o.into_iter().copied().collect()),
670            venue: venue.map(|v| v.as_ref().into()),
671            account,
672            trader,
673            symbol: symbol.map(|s| s.as_ref().to_string()),
674            parent_order_id,
675            from_inclusive: None,
676            to_exclusive: None,
677            limit: None,
678        };
679        let req = self.with_jwt(req).await?;
680        let res = client.open_orders(req).await?;
681        Ok(res.into_inner().open_orders)
682    }
683
684    pub async fn get_all_open_orders(&self) -> Result<Vec<Order>> {
685        self.get_open_orders(
686            None::<&[OrderId]>,
687            None::<&str>,
688            None,
689            None,
690            None::<&str>,
691            None,
692        )
693        .await
694    }
695
696    pub async fn get_historical_orders(
697        &self,
698        query: HistoricalOrdersRequest,
699    ) -> Result<Vec<Order>> {
700        let mut client = FolioClient::new(self.core.clone());
701        let req = self.with_jwt(query).await?;
702        let res = client.historical_orders(req).await?;
703        Ok(res.into_inner().orders)
704    }
705
706    pub async fn get_fills(&self, query: HistoricalFillsRequest) -> Result<Vec<Fill>> {
707        let mut client = FolioClient::new(self.core.clone());
708        let req = self.with_jwt(query).await?;
709        let res = client.historical_fills(req).await?;
710        Ok(res.into_inner().fills)
711    }
712
713    /// Create a bidirectional orderflow stream.
714    ///
715    /// This returns the raw bidirectional stream from the gRPC service.
716    /// You can send `OrderflowRequest` messages and receive `Orderflow` updates.
717    ///
718    /// For most use cases, consider using `place_order` and `cancel_order` methods
719    /// directly on the client instead.
720    ///
721    /// # Example
722    ///
723    /// ```rust,no_run
724    /// # use architect_sdk::Architect;
725    /// # use architect_api::orderflow::{OrderflowRequest, Orderflow, OrderType};
726    /// # use architect_api::oms::PlaceOrderRequest;
727    /// # use architect_api::Dir;
728    /// # #[tokio::main]
729    /// # async fn main() -> anyhow::Result<()> {
730    /// # let client = Architect::connect("key", "secret", true).await?;
731    /// use tokio_stream::StreamExt;
732    ///
733    /// let (tx, rx) = tokio::sync::mpsc::channel(100);
734    /// let request_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
735    /// let mut response_stream = client.orderflow(request_stream).await?;
736    ///
737    /// // Send orders through tx
738    /// let order = PlaceOrderRequest {
739    ///     id: None,
740    ///     parent_id: None,
741    ///     symbol: "BTC-USD".to_string(),
742    ///     dir: Dir::Buy,
743    ///     quantity: "0.01".parse()?,
744    ///     trader: None,
745    ///     account: None,
746    ///     order_type: OrderType::Market,
747    ///     time_in_force: architect_api::orderflow::TimeInForce::GoodTilCancel,
748    ///     source: None,
749    ///     execution_venue: None,
750    /// };
751    /// tx.send(OrderflowRequest::PlaceOrder(order)).await?;
752    ///
753    /// // Receive updates
754    /// while let Some(result) = response_stream.next().await {
755    ///     match result {
756    ///         Ok(update) => println!("Update: {:?}", update),
757    ///         Err(e) => eprintln!("Error: {}", e),
758    ///     }
759    /// }
760    /// # Ok(())
761    /// # }
762    /// ```
763    pub async fn orderflow<S>(
764        &self,
765        request_stream: S,
766    ) -> Result<tonic::codec::Streaming<Orderflow>>
767    where
768        S: futures::Stream<Item = OrderflowRequest> + Send + 'static,
769    {
770        let mut client = OrderflowClient::new(self.core.clone());
771        let req = self.with_jwt(request_stream).await?;
772        let res = client.orderflow(req).await?;
773        Ok(res.into_inner())
774    }
775
776    // ------------------------------------------------------------
777    // Order entry
778    // ------------------------------------------------------------
779
780    pub async fn place_order(&self, place_order: PlaceOrderRequest) -> Result<Order> {
781        let mut client = OmsClient::new(self.core.clone());
782        let req = self.with_jwt(place_order).await?;
783        let res = client.place_order(req).await?;
784        Ok(res.into_inner())
785    }
786
787    pub async fn cancel_order(&self, cancel_order: CancelOrderRequest) -> Result<Cancel> {
788        let mut client = OmsClient::new(self.core.clone());
789        let req = self.with_jwt(cancel_order).await?;
790        let res = client.cancel_order(req).await?;
791        Ok(res.into_inner())
792    }
793}
794
795#[derive(Default, Debug, Clone)]
796pub struct GetTickersOptions {
797    pub symbols: Option<Vec<String>>,
798    pub include_options: bool,
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804
805    /// Test deterministic endpoint resolution
806    #[tokio::test]
807    async fn test_resolve_endpoint() -> Result<()> {
808        let e = Architect::resolve_endpoint("127.0.0.1:8081", false).await?;
809        assert_eq!(e.uri().to_string(), "http://127.0.0.1:8081/");
810        let e = Architect::resolve_endpoint("https://localhost:8081", false).await?;
811        assert_eq!(e.uri().to_string(), "https://localhost:8081/");
812        Ok(())
813    }
814}