use std::sync::Arc;
use chrono::{DateTime, Utc};
use conjure_http::client::{AsyncService, ConjureRuntime};
use conjure_object::BearerToken;
use conjure_runtime::Client;
use futures::Stream;
use nominal_api::clients::scout::{AsyncRunService, AsyncRunServiceClient};
use nominal_api::objects::api::{Label, PropertyName, PropertyValue, SetOperator};
use nominal_api::objects::scout::rids::api::{LabelsFilter, PropertiesFilter};
use nominal_api::objects::scout::run::api::{
CreateRunDataSource, CustomTimeframeFilter, SearchQuery, SearchRunsRequest, SearchRunsResponse,
SortField, SortKey, SortOptions, TimeframeFilter, UpdateAttachmentsRequest, UpdateRunRequest,
};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use crate::core::{
datasource::DataSource,
datetime::{NominalDateTime, api_timestamp_to_utc_or_panic},
rid::{parse_rid, rid_to_string},
utils::paginate_stream,
};
use crate::{Error, Result};
use futures::TryStreamExt;
#[derive(Debug, Clone)]
pub struct Run {
rid: String,
name: String,
description: String,
properties: HashMap<String, String>,
labels: Vec<String>,
start: DateTime<Utc>,
end: Option<DateTime<Utc>>,
run_number: u32,
assets: Vec<String>,
data_sources: HashMap<String, DataSource>,
created_at: DateTime<Utc>,
app_base_url: String,
}
impl Run {
pub fn rid(&self) -> &str {
&self.rid
}
pub fn name(&self) -> &str {
&self.name
}
pub fn description(&self) -> &str {
&self.description
}
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
pub fn labels(&self) -> &[String] {
&self.labels
}
pub fn start(&self) -> &DateTime<Utc> {
&self.start
}
pub fn end(&self) -> Option<&DateTime<Utc>> {
self.end.as_ref()
}
pub fn run_number(&self) -> u32 {
self.run_number
}
pub fn assets(&self) -> &[String] {
&self.assets
}
pub fn data_sources(&self) -> &HashMap<String, DataSource> {
&self.data_sources
}
pub fn created_at(&self) -> &DateTime<Utc> {
&self.created_at
}
pub fn nominal_url(&self) -> String {
format!("{}/runs/{}", self.app_base_url, self.run_number)
}
pub(crate) fn from_conjure(
run: nominal_api::objects::scout::run::api::Run,
app_base_url: &str,
) -> Self {
let data_sources = run
.data_sources()
.iter()
.filter_map(|(ref_name, rds)| {
DataSource::from_conjure(rds.data_source()).map(|ds| (ref_name.to_string(), ds))
})
.collect();
Self {
rid: rid_to_string(run.rid()),
name: run.title().to_string(),
description: run.description().to_string(),
properties: run
.properties()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
labels: run.labels().iter().map(|l| l.to_string()).collect(),
start: api_timestamp_to_utc_or_panic(run.start_time()),
end: run.end_time().map(api_timestamp_to_utc_or_panic),
run_number: *run.run_number() as u32,
assets: run.assets().iter().map(rid_to_string).collect(),
data_sources,
created_at: run.created_at().to_utc(),
app_base_url: app_base_url.to_string(),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct RunUpdate {
name: Option<String>,
description: Option<String>,
properties: Option<HashMap<String, String>>,
labels: Option<Vec<String>>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
}
impl RunUpdate {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn name(mut self, value: impl Into<String>) -> Self {
self.name = Some(value.into());
self
}
#[must_use]
pub fn description(mut self, value: impl Into<String>) -> Self {
self.description = Some(value.into());
self
}
#[must_use]
pub fn properties<I, K, V>(mut self, value: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
self.properties = Some(
value
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
);
self
}
#[must_use]
pub fn labels<I>(mut self, value: I) -> Self
where
I: IntoIterator,
I::Item: Into<String>,
{
self.labels = Some(value.into_iter().map(Into::into).collect());
self
}
#[must_use]
pub fn start(mut self, value: DateTime<Utc>) -> Self {
self.start = Some(value);
self
}
#[must_use]
pub fn end(mut self, value: DateTime<Utc>) -> Self {
self.end = Some(value);
self
}
pub(crate) fn into_request(self) -> Result<UpdateRunRequest> {
let RunUpdate {
name,
description,
properties,
labels,
start,
end,
} = self;
let mut b = UpdateRunRequest::builder();
if let Some(n) = name {
b = b.title(n);
}
if let Some(d) = description {
b = b.description(d);
}
if let Some(p) = properties {
let props = p
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect::<BTreeMap<_, _>>();
b = b.properties(props);
}
if let Some(l) = labels {
b = b.labels(l.into_iter().map(|s| s.into()).collect::<BTreeSet<_>>());
}
if let Some(s) = start {
b = b.start_time(Some(NominalDateTime::try_from(s)?.into()));
}
if let Some(e) = end {
b = b.end_time(Some(NominalDateTime::try_from(e)?.into()));
}
Ok(b.assets(vec![]).build())
}
}
#[derive(Debug, Clone)]
pub enum RunQuery {
SearchText(String),
SubstringMatch(String),
Label(String),
Property(String, String),
RunNumber(u32),
StartTimeInclusive(DateTime<Utc>),
EndTimeInclusive(DateTime<Utc>),
And(Vec<RunQuery>),
Or(Vec<RunQuery>),
Not(Box<RunQuery>),
}
impl RunQuery {
pub fn search_text(text: impl Into<String>) -> Self {
Self::SearchText(text.into())
}
pub fn substring_match(text: impl Into<String>) -> Self {
Self::SubstringMatch(text.into())
}
pub fn label(label: impl Into<String>) -> Self {
Self::Label(label.into())
}
pub fn property(key: impl Into<String>, value: impl Into<String>) -> Self {
Self::Property(key.into(), value.into())
}
pub fn run_number(n: u32) -> Self {
Self::RunNumber(n)
}
pub fn start_time_inclusive(t: DateTime<Utc>) -> Self {
Self::StartTimeInclusive(t)
}
pub fn end_time_inclusive(t: DateTime<Utc>) -> Self {
Self::EndTimeInclusive(t)
}
pub fn and(queries: impl IntoIterator<Item = RunQuery>) -> Self {
Self::And(queries.into_iter().collect())
}
pub fn or(queries: impl IntoIterator<Item = RunQuery>) -> Self {
Self::Or(queries.into_iter().collect())
}
#[allow(clippy::should_implement_trait)]
pub fn not(query: RunQuery) -> Self {
Self::Not(Box::new(query))
}
pub(crate) fn collect_substring_matches(&self) -> Vec<String> {
let mut out = Vec::new();
self.collect_substring_matches_into(&mut out);
out
}
fn collect_substring_matches_into(&self, out: &mut Vec<String>) {
match self {
Self::SubstringMatch(s) => out.push(s.clone()),
Self::And(qs) => qs
.iter()
.for_each(|q| q.collect_substring_matches_into(out)),
_ => {}
}
}
fn into_conjure(self) -> crate::Result<SearchQuery> {
use crate::core::datetime::NominalDateTime;
Ok(match self {
Self::SearchText(s) => SearchQuery::SearchText(s),
Self::SubstringMatch(s) => SearchQuery::ExactMatch(s),
Self::Label(l) => SearchQuery::Labels(
LabelsFilter::builder()
.operator(SetOperator::Or)
.extend_labels([Label(l)])
.build(),
),
Self::Property(k, v) => SearchQuery::Properties(
PropertiesFilter::builder()
.name(PropertyName(k))
.extend_values([PropertyValue(v)])
.build(),
),
Self::RunNumber(n) => SearchQuery::RunNumber(
conjure_object::SafeLong::try_from(n as i64)
.expect("u32 is always within SafeLong range"),
),
Self::StartTimeInclusive(t) => {
let ts = NominalDateTime::try_from(t)?.into();
SearchQuery::StartTime(Box::new(TimeframeFilter::Custom(
CustomTimeframeFilter::builder()
.start_time(Some(ts))
.build(),
)))
}
Self::EndTimeInclusive(t) => {
let ts = NominalDateTime::try_from(t)?.into();
SearchQuery::EndTime(Box::new(TimeframeFilter::Custom(
CustomTimeframeFilter::builder().end_time(Some(ts)).build(),
)))
}
Self::And(qs) => SearchQuery::And(
qs.into_iter()
.map(Self::into_conjure)
.collect::<crate::Result<_>>()?,
),
Self::Or(qs) => SearchQuery::Or(
qs.into_iter()
.map(Self::into_conjure)
.collect::<crate::Result<_>>()?,
),
Self::Not(q) => SearchQuery::Not(Box::new(q.into_conjure()?)),
})
}
}
pub struct RunsClient {
service: AsyncRunServiceClient<Client>,
token: BearerToken,
app_base_url: String,
}
impl RunsClient {
pub(crate) fn new(
client: Client,
runtime: &Arc<ConjureRuntime>,
token: BearerToken,
app_base_url: String,
) -> Self {
Self {
service: AsyncRunServiceClient::new(client, runtime),
token,
app_base_url,
}
}
pub async fn get(&self, rid: &str) -> Result<Run> {
let run_rid = parse_rid(rid)?;
let response = self
.service
.get_run(&self.token, &run_rid)
.await
.map_err(Error::from)?;
Ok(Run::from_conjure(response, &self.app_base_url))
}
pub async fn get_batch<I, S>(&self, rids: I) -> Result<HashMap<String, Run>>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let rid_set = rids
.into_iter()
.map(|s| parse_rid(s.as_ref()).map_err(Error::from))
.collect::<Result<std::collections::BTreeSet<_>>>()?;
let response = self
.service
.get_runs(&self.token, &rid_set)
.await
.map_err(Error::from)?;
Ok(response
.into_iter()
.map(|(k, v)| (rid_to_string(&k), Run::from_conjure(v, &self.app_base_url)))
.collect())
}
pub async fn list(&self) -> Result<Vec<Run>> {
self.search(RunQuery::search_text("")).await
}
fn search_stream(&self, query: RunQuery) -> Result<impl Stream<Item = Result<Run>>> {
let conjure_query = query.into_conjure()?;
let service = self.service.clone();
let token = self.token.clone();
let app_base_url = self.app_base_url.clone();
Ok(paginate_stream(
move |page_token| {
SearchRunsRequest::builder()
.sort(
SortOptions::builder()
.is_descending(true)
.sort_key(SortKey::Field(SortField::CreatedAt))
.build(),
)
.page_size(100)
.query(conjure_query.clone())
.next_page_token(page_token)
.build()
},
move |req| {
let service = service.clone();
let token = token.clone();
async move { service.search_runs(&token, &req).await.map_err(Error::from) }
},
|resp: &SearchRunsResponse| resp.next_page_token().cloned(),
move |resp| {
resp.results()
.iter()
.map(|r| Run::from_conjure(r.clone(), &app_base_url))
.collect()
},
))
}
pub async fn search(&self, query: RunQuery) -> Result<Vec<Run>> {
let substrings = query.collect_substring_matches();
let runs: Vec<Run> = self.search_stream(query)?.try_collect().await?;
Ok(runs
.into_iter()
.filter(|r| crate::core::utils::name_matches_all(r.name(), &substrings))
.collect())
}
pub async fn update(&self, rid: &str, update: RunUpdate) -> Result<Run> {
let request = update.into_request()?;
let run_rid = parse_rid(rid)?;
let response = self
.service
.update_run(&self.token, &run_rid, &request)
.await
.map_err(Error::from)?;
Ok(Run::from_conjure(response, &self.app_base_url))
}
pub async fn add_data_sources<I, N>(&self, rid: &str, sources: I) -> Result<Run>
where
I: IntoIterator<Item = (N, DataSource)>,
N: Into<String>,
{
let data_sources = sources
.into_iter()
.map(|(ref_name, ds)| {
ds.into_conjure().map(|conjure_ds| {
(
ref_name.into().into(),
CreateRunDataSource::builder()
.data_source(conjure_ds)
.build(),
)
})
})
.collect::<Result<BTreeMap<_, _>>>()?;
let run_rid = parse_rid(rid)?;
let response = self
.service
.add_data_sources_to_run(&self.token, &run_rid, &data_sources)
.await
.map_err(Error::from)?;
Ok(Run::from_conjure(response, &self.app_base_url))
}
pub async fn add_dataset(&self, rid: &str, ref_name: &str, dataset_rid: &str) -> Result<Run> {
self.add_data_sources(rid, [(ref_name, DataSource::dataset(dataset_rid))])
.await
}
pub async fn add_video(&self, rid: &str, ref_name: &str, video_rid: &str) -> Result<Run> {
self.add_data_sources(rid, [(ref_name, DataSource::video(video_rid))])
.await
}
pub async fn add_connection(
&self,
rid: &str,
ref_name: &str,
connection_rid: &str,
) -> Result<Run> {
self.add_data_sources(rid, [(ref_name, DataSource::connection(connection_rid))])
.await
}
pub async fn add_attachments<I, S>(&self, rid: &str, attachment_rids: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let attachments_to_add = attachment_rids
.into_iter()
.map(|s| parse_rid(s.as_ref()).map_err(Error::from))
.collect::<Result<Vec<_>>>()?;
let request = UpdateAttachmentsRequest::builder()
.attachments_to_add(attachments_to_add)
.attachments_to_remove(vec![])
.build();
let run_rid = parse_rid(rid)?;
self.service
.update_run_attachment(&self.token, &run_rid, &request)
.await
.map_err(Error::from)?;
Ok(())
}
pub async fn remove_attachments<I, S>(&self, rid: &str, attachment_rids: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let attachments_to_remove = attachment_rids
.into_iter()
.map(|s| parse_rid(s.as_ref()).map_err(Error::from))
.collect::<Result<Vec<_>>>()?;
let request = UpdateAttachmentsRequest::builder()
.attachments_to_add(vec![])
.attachments_to_remove(attachments_to_remove)
.build();
let run_rid = parse_rid(rid)?;
self.service
.update_run_attachment(&self.token, &run_rid, &request)
.await
.map_err(Error::from)?;
Ok(())
}
pub async fn archive(&self, rid: &str) -> Result<()> {
let run_rid = parse_rid(rid)?;
self.service
.archive_run(&self.token, &run_rid, None)
.await
.map_err(Error::from)?;
Ok(())
}
pub async fn unarchive(&self, rid: &str) -> Result<()> {
let run_rid = parse_rid(rid)?;
self.service
.unarchive_run(&self.token, &run_rid, None)
.await
.map_err(Error::from)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use nominal_api::objects::scout::run::api::SearchQuery;
#[test]
fn query_search_text() {
let q = RunQuery::search_text("hello");
assert_eq!(
q.into_conjure().unwrap(),
SearchQuery::SearchText("hello".into())
);
}
#[test]
fn query_substring_match() {
let q = RunQuery::substring_match("exact");
assert_eq!(
q.into_conjure().unwrap(),
SearchQuery::ExactMatch("exact".into())
);
}
#[test]
fn query_label() {
let q = RunQuery::label("my-label");
let SearchQuery::Labels(f) = q.into_conjure().unwrap() else {
panic!("expected Labels variant");
};
assert_eq!(
f.labels(),
[nominal_api::objects::api::Label("my-label".into())]
);
}
#[test]
fn query_property() {
let q = RunQuery::property("key", "val");
let SearchQuery::Properties(f) = q.into_conjure().unwrap() else {
panic!("expected Properties variant");
};
assert_eq!(
f.name(),
&nominal_api::objects::api::PropertyName("key".into())
);
assert_eq!(
f.values(),
[nominal_api::objects::api::PropertyValue("val".into())]
);
}
#[test]
fn query_run_number() {
let q = RunQuery::run_number(42);
let SearchQuery::RunNumber(n) = q.into_conjure().unwrap() else {
panic!("expected RunNumber variant");
};
assert_eq!(i64::from(n), 42);
}
#[test]
fn query_start_time_inclusive() {
let dt = Utc.timestamp_opt(1_000_000, 0).single().unwrap();
let q = RunQuery::start_time_inclusive(dt);
let SearchQuery::StartTime(tf) = q.into_conjure().unwrap() else {
panic!("expected StartTime variant");
};
use crate::core::datetime::api_timestamp_to_utc;
use nominal_api::objects::scout::run::api::TimeframeFilter;
let TimeframeFilter::Custom(inner) = *tf else {
panic!("expected Custom timeframe");
};
let got = api_timestamp_to_utc(inner.start_time().unwrap()).unwrap();
assert_eq!(got, dt);
}
#[test]
fn query_end_time_inclusive() {
let dt = Utc.timestamp_opt(2_000_000, 0).single().unwrap();
let q = RunQuery::end_time_inclusive(dt);
let SearchQuery::EndTime(tf) = q.into_conjure().unwrap() else {
panic!("expected EndTime variant");
};
use crate::core::datetime::api_timestamp_to_utc;
use nominal_api::objects::scout::run::api::TimeframeFilter;
let TimeframeFilter::Custom(inner) = *tf else {
panic!("expected Custom timeframe");
};
let got = api_timestamp_to_utc(inner.end_time().unwrap()).unwrap();
assert_eq!(got, dt);
}
#[test]
fn query_not() {
let q = RunQuery::not(RunQuery::search_text("x"));
let SearchQuery::Not(inner) = q.into_conjure().unwrap() else {
panic!("expected Not variant");
};
assert_eq!(*inner, SearchQuery::SearchText("x".into()));
}
#[test]
fn query_and_children() {
let q = RunQuery::and([RunQuery::search_text("a"), RunQuery::search_text("b")]);
let SearchQuery::And(children) = q.into_conjure().unwrap() else {
panic!("expected And variant");
};
assert_eq!(children.len(), 2);
}
#[test]
fn query_or_children() {
let q = RunQuery::or([RunQuery::label("x"), RunQuery::label("y")]);
let SearchQuery::Or(children) = q.into_conjure().unwrap() else {
panic!("expected Or variant");
};
assert_eq!(children.len(), 2);
}
#[test]
fn query_nested_and_or_not() {
let q = RunQuery::and([
RunQuery::label("prod"),
RunQuery::not(RunQuery::or([
RunQuery::property("env", "us"),
RunQuery::property("env", "eu"),
])),
]);
let SearchQuery::And(children) = q.into_conjure().unwrap() else {
panic!("expected And");
};
assert!(matches!(children[0], SearchQuery::Labels(_)));
assert!(matches!(children[1], SearchQuery::Not(_)));
}
#[test]
fn update_empty_request_has_no_optional_fields() {
let req = RunUpdate::new().into_request().unwrap();
assert!(req.title().is_none());
assert!(req.description().is_none());
assert!(req.properties().is_none());
assert!(req.labels().is_none());
assert!(req.start_time().is_none());
assert!(req.end_time().is_none());
}
#[test]
fn update_name_and_description() {
let req = RunUpdate::new()
.name("My Run")
.description("desc")
.into_request()
.unwrap();
assert_eq!(req.title(), Some("My Run"));
assert_eq!(req.description(), Some("desc"));
}
#[test]
fn update_properties() {
let req = RunUpdate::new()
.properties([("k", "v")])
.into_request()
.unwrap();
let props = req.properties().expect("properties should be set");
assert_eq!(props.len(), 1);
assert_eq!(
props.get(&nominal_api::objects::api::PropertyName("k".into())),
Some(&nominal_api::objects::api::PropertyValue("v".into()))
);
}
#[test]
fn update_labels_deduplicated() {
let req = RunUpdate::new()
.labels(["a", "b", "a"])
.into_request()
.unwrap();
let labels = req.labels().expect("labels should be set");
assert_eq!(labels.len(), 2);
}
#[test]
fn update_start_and_end_time_round_trip() {
let start = Utc.timestamp_opt(1_000_000, 500_000_000).single().unwrap();
let end = Utc.timestamp_opt(2_000_000, 0).single().unwrap();
let req = RunUpdate::new()
.start(start)
.end(end)
.into_request()
.unwrap();
use crate::core::datetime::api_timestamp_to_utc;
let got_start = api_timestamp_to_utc(req.start_time().unwrap()).unwrap();
let got_end = api_timestamp_to_utc(req.end_time().unwrap()).unwrap();
assert_eq!(got_start, start);
assert_eq!(got_end, end);
}
}