sws_lua/
scraper.rs

1use std::path::PathBuf;
2use std::{fs, thread};
3
4use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender};
5use mlua::{Function, Lua, LuaSerdeExt};
6use once_cell::sync::OnceCell;
7use serde::{Deserialize, Serialize};
8use sws_crawler::{
9    CrawlerConfig, CrawlingContext, OnError, PageLocation, Scrapable, ScrapingContext, Seed,
10};
11use sws_scraper::Html;
12
13use crate::interop::{LuaCrawlingContext, LuaDate, LuaHtml, LuaScrapingContext, LuaStringRecord};
14use crate::ns::{globals, sws};
15use crate::writer;
16
17static TX_CSV_WRITER: OnceCell<(Sender<csv::StringRecord>, Sender<()>, Receiver<()>)> =
18    OnceCell::new();
19
20#[derive(Debug, Clone, Deserialize, Serialize)]
21pub struct LuaScraperConfig {
22    pub script: PathBuf,
23    pub csv_file: Option<PathBuf>,
24    pub file_mode: Option<writer::FileMode>,
25}
26
27pub struct LuaScraper {
28    lua: Lua,
29    seed: Seed,
30    tx_record: Sender<csv::StringRecord>,
31}
32
33impl Scrapable for LuaScraper {
34    type Config = LuaScraperConfig;
35
36    fn new(config: &LuaScraperConfig) -> anyhow::Result<Self> {
37        let lua = Lua::new();
38        let globals = lua.globals();
39
40        // Load and check script
41
42        let sws = lua.create_table()?;
43        globals.set(globals::SWS, sws)?;
44        lua.load(&fs_err::read_to_string(&config.script)?).exec()?;
45        let _: Function = globals.get(globals::SCRAP_PAGE)?;
46
47        if globals
48            .get::<_, Option<Function>>(globals::ACCEPT_URL)?
49            .is_none()
50        {
51            let accept_url =
52                lua.create_function(|_, (_url, _ctx): (String, LuaCrawlingContext)| Ok(true))?;
53            globals.set(globals::ACCEPT_URL, accept_url)?;
54        }
55
56        // Setup sws namespace
57
58        let sws = globals.get::<_, mlua::Table>(globals::SWS)?;
59
60        let new_record = lua.create_function(|_, ()| Ok(LuaStringRecord::default()))?;
61        sws.set(sws::RECORD, new_record)?;
62
63        let new_date =
64            lua.create_function(|_, (d, fmt): (String, String)| LuaDate::new(&d, &fmt))?;
65        sws.set(sws::DATE, new_date)?;
66
67        let location = lua.create_table()?;
68        location.set(sws::location::PATH, sws::location::PATH)?;
69        location.set(sws::location::URL, sws::location::URL)?;
70        sws.set(sws::LOCATION, location)?;
71
72        let sitemap = lua.create_table()?;
73        sitemap.set(sws::sitemap::INDEX, sws::sitemap::INDEX)?;
74        sitemap.set(sws::sitemap::URL_SET, sws::sitemap::URL_SET)?;
75        sws.set(sws::SITEMAP, sitemap)?;
76
77        // Retrieve custom values
78
79        let sitemap_urls: Option<Vec<String>> = sws.get(sws::SEED_SITEMAPS).map_err(|e| {
80            mlua::Error::RuntimeError(format!(
81                "Couldn't read {}.{} got: {}",
82                globals::SWS,
83                sws::SEED_SITEMAPS,
84                e
85            ))
86        })?;
87
88        let seed_urls: Option<Vec<String>> = sws.get(sws::SEED_PAGES).map_err(|e| {
89            mlua::Error::RuntimeError(format!(
90                "Couldn't read {}.{} got: {}",
91                globals::SWS,
92                sws::SEED_PAGES,
93                e
94            ))
95        })?;
96
97        let seed_robots: Option<String> = sws.get(sws::SEED_ROBOTS_TXT).map_err(|e| {
98            mlua::Error::RuntimeError(format!(
99                "Couldn't read {}.{} got: {}",
100                globals::SWS,
101                sws::SEED_ROBOTS_TXT,
102                e
103            ))
104        })?;
105
106        let seed = match (sitemap_urls, seed_urls, seed_robots) {
107            (Some(urls), None, None) => Seed::Sitemaps(urls),
108            (None, Some(urls), None) => Seed::Pages(urls),
109            (None, None, Some(url)) => Seed::RobotsTxt(url),
110            _ => anyhow::bail!(
111                "Invalid seed, requires exactly one of: {ns}.{s1}, {ns}.{s2}, {ns}.{s3}",
112                ns = globals::SWS,
113                s1 = sws::SEED_SITEMAPS,
114                s2 = sws::SEED_PAGES,
115                s3 = sws::SEED_ROBOTS_TXT
116            ),
117        };
118
119        let csv_config: writer::CsvWriterConfig = sws
120            .get::<_, Option<mlua::Value>>(sws::CSV_WRITER_CONFIG)?
121            .map(|h| lua.from_value(h))
122            .unwrap_or_else(|| Ok(writer::CsvWriterConfig::default()))?;
123
124        // Register sws namespace
125
126        globals.set(globals::SWS, sws)?;
127        drop(globals);
128
129        // Setup csv writer
130
131        let (tx_record, _, _) = TX_CSV_WRITER.get_or_try_init::<_, anyhow::Error>(move || {
132            let (tx_record, rx_record) = unbounded::<csv::StringRecord>();
133            let (tx_stop, rx_stop) = bounded::<()>(1);
134            let (tx_done, rx_done) = bounded::<()>(1);
135
136            let mut wtr = match &config.csv_file {
137                Some(path) => {
138                    let opts: fs_err::OpenOptions = config
139                        .file_mode
140                        .as_ref()
141                        .map(Clone::clone)
142                        .unwrap_or_default()
143                        .into();
144                    let wtr = csv::WriterBuilder::from(&csv_config).from_writer(opts.open(path)?);
145                    writer::CsvWriter::File(wtr)
146                }
147                None => {
148                    let wtr = csv::WriterBuilder::from(&csv_config).from_writer(std::io::stdout());
149                    writer::CsvWriter::Stdout(wtr)
150                }
151            };
152
153            thread::spawn(move || loop {
154                select! {
155                    recv(rx_stop) -> _ => {
156                        wtr.flush().ok();
157                        tx_done.send(()).ok();
158                        break;
159                    },
160                    recv(rx_record) -> msg => {
161                        msg.map(|record| wtr.write_record(record.into_iter()))
162                            .map(|res| if let Err(e) = res {
163                                log::error!("Couldn't write record: {e}");
164                            })
165                            .ok();
166                    }
167                }
168            });
169
170            Ok((tx_record, tx_stop, rx_done))
171        })?;
172
173        // Setup context
174
175        Ok(Self {
176            lua,
177            seed,
178            tx_record: tx_record.clone(),
179        })
180    }
181
182    fn finalizer(&mut self) {
183        TX_CSV_WRITER.get().map(|(_, tx_stop, rx_done)| {
184            tx_stop.send(()).ok();
185            rx_done.recv().ok()
186        });
187    }
188
189    fn seed(&self) -> Seed {
190        self.seed.clone()
191    }
192
193    fn scrap(&mut self, page: String, scraping_context: ScrapingContext) -> anyhow::Result<()> {
194        let scrap_page: Function = self
195            .lua
196            .globals()
197            .get(globals::SCRAP_PAGE)
198            .unwrap_or_else(|_| panic!("Function {} not found", globals::SCRAP_PAGE)); // Ensured in constructor
199
200        let page = LuaHtml(Html::parse_document(&page));
201        let ctx = LuaScrapingContext::new(self.tx_record.clone(), scraping_context);
202
203        scrap_page
204            .call::<_, ()>((page, ctx))
205            .map_err(|e| anyhow::anyhow!(e.to_string().replace('\n', "")))
206    }
207
208    fn accept(&self, url: &str, crawling_ctx: CrawlingContext) -> bool {
209        let accept_url: Function = self
210            .lua
211            .globals()
212            .get(globals::ACCEPT_URL)
213            .unwrap_or_else(|_| panic!("Function {} not found", globals::ACCEPT_URL)); // Ensured in constructor
214
215        let ctx: LuaCrawlingContext = crawling_ctx.clone().into();
216        match accept_url.call::<_, bool>((url.to_string(), ctx)) {
217            Ok(accepted) => accepted,
218            Err(e) => {
219                log::error!(
220                    "Couldn't process URL {url} ({crawling_ctx:?}) in function {}: {}",
221                    globals::ACCEPT_URL,
222                    e.to_string().replace('\n', "")
223                );
224                false
225            }
226        }
227    }
228}
229
230impl TryFrom<&LuaScraperConfig> for CrawlerConfig {
231    type Error = anyhow::Error;
232
233    fn try_from(c: &LuaScraperConfig) -> Result<Self, Self::Error> {
234        let lua = Lua::new();
235        let globals = lua.globals();
236
237        let sws = lua.create_table()?;
238        globals.set(globals::SWS, sws)?;
239        lua.load(&fs_err::read_to_string(&c.script)?).exec()?;
240
241        let crawler_config: CrawlerConfig = globals
242            .get::<_, mlua::Table>(globals::SWS)?
243            .get::<_, Option<mlua::Value>>(sws::CRAWLER_CONFIG)?
244            .map(|h| lua.from_value(h))
245            .unwrap_or_else(|| Ok(CrawlerConfig::default()))?;
246
247        Ok(crawler_config)
248    }
249}
250
251pub fn scrap_glob(
252    config: &LuaScraperConfig,
253    pattern: &str,
254    on_error: OnError,
255    num_workers: usize,
256) -> anyhow::Result<()> {
257    let (tx_path, rx_path) = unbounded::<PathBuf>();
258
259    let mut workers = vec![];
260    for id in 0..num_workers {
261        let rx_path = rx_path.clone();
262        let config = config.clone();
263        let worker = thread::Builder::new()
264            .name(format!("{id}"))
265            .spawn(move || {
266                let mut scraper = LuaScraper::new(&config)?;
267                for path in rx_path.into_iter() {
268                    let page = fs::read_to_string(&path)?;
269                    let ctx = ScrapingContext::with_location(PageLocation::Path(path));
270                    match scraper.scrap(page, ctx) {
271                        Ok(()) => (),
272                        Err(e) => match on_error {
273                            OnError::SkipAndLog => {
274                                log::error!("Skipping page scrap: {e}");
275                            }
276                            OnError::Fail => {
277                                return Err(e);
278                            }
279                        },
280                    }
281                }
282                Ok::<(), anyhow::Error>(())
283            })?;
284        workers.push(worker);
285    }
286
287    for path in glob::glob(pattern)? {
288        tx_path.send(path?).ok();
289    }
290    drop(tx_path);
291
292    for w in workers {
293        w.join().unwrap()?;
294    }
295
296    Ok(())
297}
298
299pub fn scrap_page(
300    config: &LuaScraperConfig,
301    page: String,
302    location: PageLocation,
303) -> anyhow::Result<()> {
304    let mut scraper = LuaScraper::new(config)?;
305    scraper.scrap(page, ScrapingContext::with_location(location))?;
306    scraper.finalizer();
307    Ok(())
308}