use serde_json::{json, Value};
use std::sync::Arc;
use std::time::{Duration, Instant};
use synaptic_core::SynapticError;
use tokio::sync::Mutex;
use crate::api::cardkit::CardKitApi;
use crate::api::message::MessageApi;
use crate::LarkConfig;
const DEFAULT_THROTTLE_MS: u64 = 500;
#[derive(Debug, Clone)]
pub struct StreamingCardOptions {
pub title: String,
pub throttle: Duration,
}
impl Default for StreamingCardOptions {
fn default() -> Self {
Self {
title: String::new(),
throttle: Duration::from_millis(DEFAULT_THROTTLE_MS),
}
}
}
impl StreamingCardOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_title(mut self, title: impl Into<String>) -> Self {
self.title = title.into();
self
}
pub fn with_throttle(mut self, dur: Duration) -> Self {
self.throttle = dur;
self
}
}
struct WriterState {
card_id: String,
message_id: String,
sequence: i64,
content: String,
last_update: Instant,
finished: bool,
}
const ELEMENT_ID: &str = "streaming_content";
pub struct StreamingCardWriter {
cardkit: CardKitApi,
state: Arc<Mutex<WriterState>>,
options: StreamingCardOptions,
}
impl StreamingCardWriter {
pub async fn send(
config: LarkConfig,
receive_id_type: &str,
receive_id: &str,
options: StreamingCardOptions,
) -> Result<Self, SynapticError> {
let cardkit = CardKitApi::new(config.clone());
let msg_api = MessageApi::new(config);
let card_json = build_card_json_streaming(&options.title, "", true);
let card_id = cardkit.create(&card_json).await?;
let content_json = json!({
"type": "card",
"data": { "card_id": &card_id }
})
.to_string();
let message_id = msg_api
.send(receive_id_type, receive_id, "interactive", &content_json)
.await?;
Ok(Self {
cardkit,
state: Arc::new(Mutex::new(WriterState {
card_id,
message_id,
sequence: 0,
content: String::new(),
last_update: Instant::now(),
finished: false,
})),
options,
})
}
pub async fn reply(
config: LarkConfig,
reply_to_message_id: &str,
options: StreamingCardOptions,
) -> Result<Self, SynapticError> {
let cardkit = CardKitApi::new(config.clone());
let msg_api = MessageApi::new(config);
let card_json = build_card_json_streaming(&options.title, "", true);
let card_id = cardkit.create(&card_json).await?;
let content_json = json!({
"type": "card",
"data": { "card_id": &card_id }
})
.to_string();
let message_id = msg_api
.reply(reply_to_message_id, "interactive", &content_json)
.await?;
Ok(Self {
cardkit,
state: Arc::new(Mutex::new(WriterState {
card_id,
message_id,
sequence: 0,
content: String::new(),
last_update: Instant::now(),
finished: false,
})),
options,
})
}
pub async fn write(&self, text: &str) -> Result<(), SynapticError> {
let mut state = self.state.lock().await;
if state.finished {
return Err(SynapticError::Tool(
"StreamingCardWriter: already finished".to_string(),
));
}
state.content.push_str(text);
let elapsed = state.last_update.elapsed();
if elapsed >= self.options.throttle {
self.flush_inner(&mut state).await?;
}
Ok(())
}
pub async fn finish(&self) -> Result<(), SynapticError> {
let mut state = self.state.lock().await;
if state.finished {
return Ok(());
}
state.finished = true;
state.sequence += 1;
let card_json = build_card_json_streaming(&self.options.title, &state.content, false);
self.cardkit
.update(&state.card_id, state.sequence, &card_json)
.await?;
state.last_update = Instant::now();
Ok(())
}
pub async fn flush(&self) -> Result<(), SynapticError> {
let mut state = self.state.lock().await;
if state.finished {
return Ok(());
}
self.flush_inner(&mut state).await
}
pub async fn card_id(&self) -> String {
self.state.lock().await.card_id.clone()
}
pub async fn message_id(&self) -> String {
self.state.lock().await.message_id.clone()
}
async fn flush_inner(&self, state: &mut WriterState) -> Result<(), SynapticError> {
state.sequence += 1;
self.cardkit
.stream_content(&state.card_id, ELEMENT_ID, &state.content, state.sequence)
.await?;
state.last_update = Instant::now();
Ok(())
}
}
pub fn build_card_json(title: &str, markdown_content: &str) -> Value {
build_card_json_streaming(title, markdown_content, false)
}
pub fn build_card_json_streaming(title: &str, markdown_content: &str, streaming: bool) -> Value {
let mut config = json!({ "update_multi": true });
if streaming {
config["streaming_mode"] = json!(true);
config["streaming_config"] = json!({
"print_frequency_ms": { "default": 30 },
"print_step": { "default": 2 },
"print_strategy": "fast"
});
}
let mut card = json!({
"schema": "2.0",
"config": config,
"body": {
"elements": [{
"tag": "markdown",
"content": markdown_content,
"element_id": "streaming_content"
}]
}
});
if !title.is_empty() {
card["header"] = json!({
"title": {
"tag": "plain_text",
"content": title
}
});
}
card
}