Crate openai_flows

source ·
Expand description

OpenAI integration for Flows.network

Quick Start

To get started, let’s write a tiny flow function.

use openai_flows::{
    chat::ChatOptions,
    OpenAIFlows,
};
use lambda_flows::{request_received, send_response};
use serde_json::Value;
use std::collections::HashMap;

#[no_mangle]
#[tokio::main(flavor = "current_thread")]
pub async fn run() {
    request_received(handler).await;
}

async fn handler(_qry: HashMap<String, Value>, body: Vec<u8>) {
    let co = ChatOptions::default();
    let of = OpenAIFlows::new();

    let r = match of.chat_completion(
        "any_conversation_id",
        String::from_utf8_lossy(&body).into_owned().as_str(),
        &co,
    )
    .await
    {
        Ok(c) => c.choice,
        Err(e) => e,
    };
     
    send_response(
        200,
        vec![(
            String::from("content-type"),
            String::from("text/plain; charset=UTF-8"),
        )],
        r.as_bytes().to_vec(),
    );
}

When the Lambda request is received, chat using [chat_completion] then send the response.

Real World Example

HackerNews Alert uses openai_flows chat_completion function to summarize a news article.

use anyhow;
use chrono::{Duration, Utc};
use dotenv::dotenv;
use http_req::{request, request::Method, request::Request, uri::Uri};
use openai_flows::{
    chat::{ChatModel, ChatOptions},
    OpenAIFlows,
};
use schedule_flows::schedule_cron_job;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use slack_flows::{listen_to_channel, send_message_to_channel};
use std::env;
use std::net::SocketAddr;
use std::time::{SystemTime, UNIX_EPOCH};
use web_scraper_flows::get_page_text;

#[no_mangle]
pub fn run() {
    dotenv().ok();
    let keyword = std::env::var("KEYWORD").unwrap_or("chatGPT".to_string());
    schedule_cron_job(String::from("02 * * * *"), keyword, callback);
}

#[no_mangle]
#[tokio::main(flavor = "current_thread")]
async fn callback(keyword: Vec<u8>) {
    let workspace = env::var("slack_workspace").unwrap_or("secondstate".to_string());
    let channel = env::var("slack_channel").unwrap_or("github-status".to_string());
    let query = String::from_utf8_lossy(&keyword);
    let now = SystemTime::now();
    let dura = now.duration_since(UNIX_EPOCH).unwrap().as_secs() - 3600;
    let url = format!("https://hn.algolia.com/api/v1/search_by_date?tags=story&query={query}&numericFilters=created_at_i>{dura}");
    let mut writer = Vec::new();
    if let Ok(_) = request::get(url, &mut writer) {
        if let Ok(search) = serde_json::from_slice::<Search>(&writer) {
            for hit in search.hits {
                let title = &hit.title;
                let url = &hit.url;
                let object_id = &hit.object_id;
                let author = &hit.author;
                let post = format!("https://news.ycombinator.com/item?id={object_id}");

                match url {
                    Some(u) => {
                        let source = format!("(<{u}|source>)");
                        if let Ok(text) = get_page_text(u).await {
                            let text = text.split_whitespace().collect::<Vec<&str>>().join(" ");
                            match get_summary_truncated(&text).await {
                                Ok(summary) => {
                                    let msg = format!(
                                        "- *{title}*\n<{post} | post>{source} by {author}\n{text}"
                                    );
                                    send_message_to_channel(&workspace, &channel, msg).await;
                                }
                                Err(_e) => {
                                    // Err(anyhow::Error::msg(_e.to_string()))
                                }
                            }
                        }
                    }
                    None => {
                        if let Ok(text) = get_page_text(&post).await {
                            let text = text.split_whitespace().collect::<Vec<&str>>().join(" ");
                            if let Ok(summary) = get_summary_truncated(&text).await {
                                let msg =
                                    format!("- *{title}*\n<{post} | post> by {author}\n{summary}");
                                send_message_to_channel(&workspace, &channel, msg).await;
                            }
                        }
                    }
                };
            }
        }
    }
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Search {
    pub hits: Vec<Hit>,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct Hit {
    pub title: String,
    pub url: Option<String>,
    #[serde(rename = "objectID")]
    pub object_id: String,
    pub author: String,
    pub created_at_i: i64,
}
// texts in the news webpage could exceed the maximum token limit of the language model,
// it is truncated to 11000 space delimited words in this code, when tokenized,
// the total token count shall be within the 16k limit of the model used.
async fn get_summary_truncated(inp: &str) -> anyhow::Result<String> {
    let mut openai = OpenAIFlows::new();
    openai.set_retry_times(3);

    let news_body = inp
        .split_ascii_whitespace()
        .take(11000)
        .collect::<Vec<&str>>()
        .join(" ");

    let chat_id = format!("news-summary-N");
    let system = &format!("You're a news editor AI.");

    let co = ChatOptions {
        model: ChatModel::GPT35Turbo16K,
        restart: true,
        system_prompt: Some(system),
        ..chatOptions::default()
    };

    let question = format!("Make a concise summary within 200 words on this: {news_body}.");

    match openai.chat_completion(&chat_id, &question, &co).await {
        Ok(r) => Ok(r.choice),
        Err(_e) => Err(anyhow::Error::msg(_e.to_string())),
    }
}

Modules

Structs

  • The main struct for setting the basic configuration for OpenAI interface.

Enums