use std::collections::BTreeMap;
use std::pin::Pin;
use async_stream::try_stream;
use futures_util::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::client::PageRequestSpec;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
pub struct ListEnvelope<T> {
#[serde(default)]
pub object: String,
#[serde(default)]
pub data: Vec<T>,
pub first_id: Option<String>,
pub last_id: Option<String>,
#[serde(default)]
pub has_more: bool,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
pub struct Page<T> {
#[serde(default)]
pub object: String,
#[serde(default)]
pub data: Vec<T>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorPage<T> {
#[serde(default)]
pub object: String,
#[serde(default)]
pub data: Vec<T>,
pub first_id: Option<String>,
pub last_id: Option<String>,
#[serde(default)]
pub has_more: bool,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
#[serde(skip)]
pub(crate) next: Option<PageRequestSpec>,
}
impl<T> Default for CursorPage<T> {
fn default() -> Self {
Self {
object: String::new(),
data: Vec::new(),
first_id: None,
last_id: None,
has_more: false,
extra: BTreeMap::new(),
next: None,
}
}
}
impl<T> From<ListEnvelope<T>> for CursorPage<T> {
fn from(value: ListEnvelope<T>) -> Self {
Self {
object: value.object,
data: value.data,
first_id: value.first_id,
last_id: value.last_id,
has_more: value.has_more,
extra: value.extra,
next: None,
}
}
}
impl<T> CursorPage<T>
where
T: Clone + Send + Sync + serde::de::DeserializeOwned + 'static,
{
pub fn with_next_request(mut self, next: Option<PageRequestSpec>) -> Self {
self.next = next;
self
}
pub fn has_next_page(&self) -> bool {
self.has_more && self.next.is_some()
}
pub async fn next_page(&self) -> Result<Self> {
let next = self
.next
.clone()
.ok_or_else(|| Error::InvalidConfig("当前页面没有下一页游标".into()))?;
let client = next.client.clone();
client.fetch_cursor_page(next).await
}
#[allow(tail_expr_drop_order)]
pub fn into_stream(self) -> PageStream<T> {
Box::pin(try_stream! {
let mut current = Some(self);
while let Some(page) = current.take() {
for item in &page.data {
yield item.clone();
}
if page.has_next_page() {
current = Some(page.next_page().await?);
}
}
})
}
}
pub type PageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;